New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC – NEMO

source: NEMO/branches/2018/dev_r10164_HPC09_ESIWACE_PREP_MERGE/src/OCE/LBC/lib_mpp.F90 @ 10337

Last change on this file since 10337 was 10337, checked in by smasson, 5 years ago

dev_r10164_HPC09_ESIWACE_PREP_MERGE: action 3d: force safe NPF (2 lines update) for field recieved from oasis, see #2133

  • Property svn:keywords set to Id
File size: 78.8 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add mpp_lnk_bdy_3d/2d routines to optimize the BDY comm.
22   !!            3.5  !  2013  (C. Ethe, G. Madec)  message passing arrays as local variables
23   !!            3.5  !  2013  (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
24   !!            3.6  !  2015  (O. Tintó and M. Castrillo - BSC) Added '_multiple' case for 2D lbc and max
25   !!            4.0  !  2017  (G. Madec) automatique allocation of array argument (use any 3rd dimension)
26   !!             -   !  2017  (G. Madec) create generic.h90 files to generate all lbc and north fold routines
27   !!----------------------------------------------------------------------
28
29   !!----------------------------------------------------------------------
30   !!   ctl_stop      : update momentum and tracer Kz from a tke scheme
31   !!   ctl_warn      : initialization, namelist read, and parameters control
32   !!   ctl_opn       : Open file and check if required file is available.
33   !!   ctl_nam       : Prints informations when an error occurs while reading a namelist
34   !!   get_unit      : give the index of an unused logical unit
35   !!----------------------------------------------------------------------
36#if   defined key_mpp_mpi
37   !!----------------------------------------------------------------------
38   !!   'key_mpp_mpi'             MPI massively parallel processing library
39   !!----------------------------------------------------------------------
40   !!   lib_mpp_alloc : allocate mpp arrays
41   !!   mynode        : indentify the processor unit
42   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
43   !!   mpp_lnk_icb   : interface for message passing of 2d arrays with extra halo for icebergs (mpp_lnk_2d_icb)
44   !!   mpprecv       :
45   !!   mppsend       :
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north_icb : alternative to mpp_nfd for extra outer halo with icebergs
57   !!----------------------------------------------------------------------
58   USE dom_oce        ! ocean space and time domain
59   USE lbcnfd         ! north fold treatment
60   USE in_out_manager ! I/O manager
61
62   IMPLICIT NONE
63   PRIVATE
64
65   INTERFACE mpp_nfd
66      MODULE PROCEDURE   mpp_nfd_2d      , mpp_nfd_3d      , mpp_nfd_4d
67      MODULE PROCEDURE   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
68   END INTERFACE
69
70   ! Interface associated to the mpp_lnk_... routines is defined in lbclnk
71   PUBLIC   mpp_lnk_2d      , mpp_lnk_3d      , mpp_lnk_4d
72   PUBLIC   mpp_lnk_2d_ptr, mpp_lnk_3d_ptr, mpp_lnk_4d_ptr
73   !
74!!gm  this should be useless
75   PUBLIC   mpp_nfd_2d    , mpp_nfd_3d    , mpp_nfd_4d
76   PUBLIC   mpp_nfd_2d_ptr, mpp_nfd_3d_ptr, mpp_nfd_4d_ptr
77!!gm end
78   !
79   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
80   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
81   PUBLIC   mpp_ini_north
82   PUBLIC   mpp_lnk_2d_icb
83   PUBLIC   mpp_lbc_north_icb
84   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
85   PUBLIC   mpp_ilor
86   PUBLIC   mppscatter, mppgather
87   PUBLIC   mpp_ini_znl
88   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
89   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d, mpp_lnk_bdy_4d
90   
91   !! * Interfaces
92   !! define generic interface for these routine as they are called sometimes
93   !! with scalar arguments instead of array arguments, which causes problems
94   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
95   INTERFACE mpp_min
96      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
97   END INTERFACE
98   INTERFACE mpp_max
99      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
100   END INTERFACE
101   INTERFACE mpp_sum
102      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real,   &
103         &             mppsum_realdd, mppsum_a_realdd
104   END INTERFACE
105   INTERFACE mpp_minloc
106      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
107   END INTERFACE
108   INTERFACE mpp_maxloc
109      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
110   END INTERFACE
111
112   !! ========================= !!
113   !!  MPI  variable definition !!
114   !! ========================= !!
115!$AGRIF_DO_NOT_TREAT
116   INCLUDE 'mpif.h'
117!$AGRIF_END_DO_NOT_TREAT
118
119   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
120
121   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
122
123   INTEGER, PUBLIC ::   mppsize        ! number of process
124   INTEGER, PUBLIC ::   mpprank        ! process number  [ 0 - size-1 ]
125!$AGRIF_DO_NOT_TREAT
126   INTEGER, PUBLIC ::   mpi_comm_oce   ! opa local communicator
127!$AGRIF_END_DO_NOT_TREAT
128
129   INTEGER :: MPI_SUMDD
130
131   ! variables used for zonal integration
132   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
133   LOGICAL, PUBLIC ::   l_znl_root      !: True on the 'left'most processor on the same row
134   INTEGER         ::   ngrp_znl        !  group ID for the znl processors
135   INTEGER         ::   ndim_rank_znl   !  number of processors on the same zonal average
136   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
137
138   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
139   INTEGER, PUBLIC ::   ngrp_world        !: group ID for the world processors
140   INTEGER, PUBLIC ::   ngrp_opa          !: group ID for the opa processors
141   INTEGER, PUBLIC ::   ngrp_north        !: group ID for the northern processors (to be fold)
142   INTEGER, PUBLIC ::   ncomm_north       !: communicator made by the processors belonging to ngrp_north
143   INTEGER, PUBLIC ::   ndim_rank_north   !: number of 'sea' processor in the northern line (can be /= jpni !)
144   INTEGER, PUBLIC ::   njmppmax          !: value of njmpp for the processors of the northern line
145   INTEGER, PUBLIC ::   north_root        !: number (in the comm_opa) of proc 0 in the northern comm
146   INTEGER, PUBLIC, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_north   !: dimension ndim_rank_north
147
148   ! Type of send : standard, buffered, immediate
149   CHARACTER(len=1), PUBLIC ::   cn_mpi_send        !: type od mpi send/recieve (S=standard, B=bsend, I=isend)
150   LOGICAL         , PUBLIC ::   l_isend = .FALSE.  !: isend use indicator (T if cn_mpi_send='I')
151   INTEGER         , PUBLIC ::   nn_buffer          !: size of the buffer in case of mpi_bsend
152
153   ! Communications summary report
154   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_lbc                   !: names of lbc_lnk calling routines
155   CHARACTER(len=128), DIMENSION(:), ALLOCATABLE ::   crname_glb                   !: names of global comm  calling routines
156   INTEGER, PUBLIC                               ::   ncom_stp = 0                 !: copy of time step # istp
157   INTEGER, PUBLIC                               ::   ncom_fsbc = 1                !: copy of sbc time step # nn_fsbc
158   INTEGER, PUBLIC                               ::   ncom_dttrc = 1               !: copy of top time step # nn_dttrc
159   INTEGER, PUBLIC                               ::   ncom_freq                    !: frequency of comm diagnostic
160   INTEGER, PUBLIC , DIMENSION(:,:), ALLOCATABLE ::   ncomm_sequence               !: size of communicated arrays (halos)
161   INTEGER, PARAMETER, PUBLIC                    ::   ncom_rec_max = 2000          !: max number of communication record
162   INTEGER, PUBLIC                               ::   n_sequence_lbc = 0           !: # of communicated arraysvia lbc
163   INTEGER, PUBLIC                               ::   n_sequence_glb = 0           !: # of global communications
164   INTEGER, PUBLIC                               ::   numcom = -1                  !: logical unit for communicaton report
165   INTEGER, PUBLIC                               ::   l_full_nf_update = .FALSE.   !: logical for a full (2lines) update of bc at North fold report
166
167   ! timing summary report
168   REAL(wp), DIMENSION(2), PUBLIC ::  waiting_time = 0._wp
169   REAL(wp)              , PUBLIC ::  compute_time = 0._wp, elapsed_time = 0._wp
170   
171   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE ::   tampon   ! buffer in case of bsend
172
173   LOGICAL, PUBLIC ::   ln_nnogather                !: namelist control of northfold comms
174   LOGICAL, PUBLIC ::   l_north_nogather = .FALSE.  !: internal control of northfold comms
175
176   !!----------------------------------------------------------------------
177   !! NEMO/OCE 4.0 , NEMO Consortium (2018)
178   !! $Id$
179   !! Software governed by the CeCILL license (see ./LICENSE)
180   !!----------------------------------------------------------------------
181CONTAINS
182
183   FUNCTION mynode( ldtxt, ldname, kumnam_ref, kumnam_cfg, kumond, kstop, localComm )
184      !!----------------------------------------------------------------------
185      !!                  ***  routine mynode  ***
186      !!
187      !! ** Purpose :   Find processor unit
188      !!----------------------------------------------------------------------
189      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt        !
190      CHARACTER(len=*)             , INTENT(in   ) ::   ldname       !
191      INTEGER                      , INTENT(in   ) ::   kumnam_ref   ! logical unit for reference namelist
192      INTEGER                      , INTENT(in   ) ::   kumnam_cfg   ! logical unit for configuration namelist
193      INTEGER                      , INTENT(inout) ::   kumond       ! logical unit for namelist output
194      INTEGER                      , INTENT(inout) ::   kstop        ! stop indicator
195      INTEGER         , OPTIONAL   , INTENT(in   ) ::   localComm    !
196      !
197      INTEGER ::   mynode, ierr, code, ji, ii, ios
198      LOGICAL ::   mpi_was_called
199      !
200      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
201      !!----------------------------------------------------------------------
202      !
203      ii = 1
204      WRITE(ldtxt(ii),*)                                                                  ;   ii = ii + 1
205      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                    ;   ii = ii + 1
206      WRITE(ldtxt(ii),*) '~~~~~~ '                                                        ;   ii = ii + 1
207      !
208      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
209      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
210901   IF( ios /= 0 )   CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
211      !
212      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
213      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
214902   IF( ios >  0 )   CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
215      !
216      !                              ! control print
217      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                             ;   ii = ii + 1
218      WRITE(ldtxt(ii),*) '      mpi send type          cn_mpi_send = ', cn_mpi_send       ;   ii = ii + 1
219      WRITE(ldtxt(ii),*) '      size exported buffer   nn_buffer   = ', nn_buffer,' bytes';   ii = ii + 1
220      !
221      IF( jpni < 1 .OR. jpnj < 1  ) THEN
222         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically' ;   ii = ii + 1
223      ELSE
224         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni       ;   ii = ii + 1
225         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj       ;   ii = ii + 1
226      ENDIF
227
228      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
229
230      CALL mpi_initialized ( mpi_was_called, code )
231      IF( code /= MPI_SUCCESS ) THEN
232         DO ji = 1, SIZE(ldtxt)
233            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
234         END DO
235         WRITE(*, cform_err)
236         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
237         CALL mpi_abort( mpi_comm_world, code, ierr )
238      ENDIF
239
240      IF( mpi_was_called ) THEN
241         !
242         SELECT CASE ( cn_mpi_send )
243         CASE ( 'S' )                ! Standard mpi send (blocking)
244            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
245         CASE ( 'B' )                ! Buffer mpi send (blocking)
246            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
247            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
248         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
249            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
250            l_isend = .TRUE.
251         CASE DEFAULT
252            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
253            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
254            kstop = kstop + 1
255         END SELECT
256         !
257      ELSEIF ( PRESENT(localComm) .AND. .NOT. mpi_was_called ) THEN
258         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '          ;   ii = ii + 1
259         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                ;   ii = ii + 1
260         kstop = kstop + 1
261      ELSE
262         SELECT CASE ( cn_mpi_send )
263         CASE ( 'S' )                ! Standard mpi send (blocking)
264            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'             ;   ii = ii + 1
265            CALL mpi_init( ierr )
266         CASE ( 'B' )                ! Buffer mpi send (blocking)
267            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'              ;   ii = ii + 1
268            IF( Agrif_Root() )   CALL mpi_init_oce( ldtxt, ii, ierr )
269         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
270            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'           ;   ii = ii + 1
271            l_isend = .TRUE.
272            CALL mpi_init( ierr )
273         CASE DEFAULT
274            WRITE(ldtxt(ii),cform_err)                                                    ;   ii = ii + 1
275            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send     ;   ii = ii + 1
276            kstop = kstop + 1
277         END SELECT
278         !
279      ENDIF
280
281      IF( PRESENT(localComm) ) THEN
282         IF( Agrif_Root() ) THEN
283            mpi_comm_oce = localComm
284         ENDIF
285      ELSE
286         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code)
287         IF( code /= MPI_SUCCESS ) THEN
288            DO ji = 1, SIZE(ldtxt)
289               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
290            END DO
291            WRITE(*, cform_err)
292            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
293            CALL mpi_abort( mpi_comm_world, code, ierr )
294         ENDIF
295      ENDIF
296
297#if defined key_agrif
298      IF( Agrif_Root() ) THEN
299         CALL Agrif_MPI_Init(mpi_comm_oce)
300      ELSE
301         CALL Agrif_MPI_set_grid_comm(mpi_comm_oce)
302      ENDIF
303#endif
304
305      CALL mpi_comm_rank( mpi_comm_oce, mpprank, ierr )
306      CALL mpi_comm_size( mpi_comm_oce, mppsize, ierr )
307      mynode = mpprank
308
309      IF( mynode == 0 ) THEN
310         CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
311         WRITE(kumond, nammpp)     
312      ENDIF
313      !
314      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
315      !
316   END FUNCTION mynode
317
318   !!----------------------------------------------------------------------
319   !!                   ***  routine mpp_lnk_(2,3,4)d  ***
320   !!
321   !!   * Argument : dummy argument use in mpp_lnk_... routines
322   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
323   !!                cd_nat :   nature of array grid-points
324   !!                psgn   :   sign used across the north fold boundary
325   !!                kfld   :   optional, number of pt3d arrays
326   !!                cd_mpp :   optional, fill the overlap area only
327   !!                pval   :   optional, background value (used at closed boundaries)
328   !!----------------------------------------------------------------------
329   !
330   !                       !==  2D array and array of 2D pointer  ==!
331   !
332#  define DIM_2d
333#     define ROUTINE_LNK           mpp_lnk_2d
334#     include "mpp_lnk_generic.h90"
335#     undef ROUTINE_LNK
336#     define MULTI
337#     define ROUTINE_LNK           mpp_lnk_2d_ptr
338#     include "mpp_lnk_generic.h90"
339#     undef ROUTINE_LNK
340#     undef MULTI
341#  undef DIM_2d
342   !
343   !                       !==  3D array and array of 3D pointer  ==!
344   !
345#  define DIM_3d
346#     define ROUTINE_LNK           mpp_lnk_3d
347#     include "mpp_lnk_generic.h90"
348#     undef ROUTINE_LNK
349#     define MULTI
350#     define ROUTINE_LNK           mpp_lnk_3d_ptr
351#     include "mpp_lnk_generic.h90"
352#     undef ROUTINE_LNK
353#     undef MULTI
354#  undef DIM_3d
355   !
356   !                       !==  4D array and array of 4D pointer  ==!
357   !
358#  define DIM_4d
359#     define ROUTINE_LNK           mpp_lnk_4d
360#     include "mpp_lnk_generic.h90"
361#     undef ROUTINE_LNK
362#     define MULTI
363#     define ROUTINE_LNK           mpp_lnk_4d_ptr
364#     include "mpp_lnk_generic.h90"
365#     undef ROUTINE_LNK
366#     undef MULTI
367#  undef DIM_4d
368
369   !!----------------------------------------------------------------------
370   !!                   ***  routine mpp_nfd_(2,3,4)d  ***
371   !!
372   !!   * Argument : dummy argument use in mpp_nfd_... routines
373   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
374   !!                cd_nat :   nature of array grid-points
375   !!                psgn   :   sign used across the north fold boundary
376   !!                kfld   :   optional, number of pt3d arrays
377   !!                cd_mpp :   optional, fill the overlap area only
378   !!                pval   :   optional, background value (used at closed boundaries)
379   !!----------------------------------------------------------------------
380   !
381   !                       !==  2D array and array of 2D pointer  ==!
382   !
383#  define DIM_2d
384#     define ROUTINE_NFD           mpp_nfd_2d
385#     include "mpp_nfd_generic.h90"
386#     undef ROUTINE_NFD
387#     define MULTI
388#     define ROUTINE_NFD           mpp_nfd_2d_ptr
389#     include "mpp_nfd_generic.h90"
390#     undef ROUTINE_NFD
391#     undef MULTI
392#  undef DIM_2d
393   !
394   !                       !==  3D array and array of 3D pointer  ==!
395   !
396#  define DIM_3d
397#     define ROUTINE_NFD           mpp_nfd_3d
398#     include "mpp_nfd_generic.h90"
399#     undef ROUTINE_NFD
400#     define MULTI
401#     define ROUTINE_NFD           mpp_nfd_3d_ptr
402#     include "mpp_nfd_generic.h90"
403#     undef ROUTINE_NFD
404#     undef MULTI
405#  undef DIM_3d
406   !
407   !                       !==  4D array and array of 4D pointer  ==!
408   !
409#  define DIM_4d
410#     define ROUTINE_NFD           mpp_nfd_4d
411#     include "mpp_nfd_generic.h90"
412#     undef ROUTINE_NFD
413#     define MULTI
414#     define ROUTINE_NFD           mpp_nfd_4d_ptr
415#     include "mpp_nfd_generic.h90"
416#     undef ROUTINE_NFD
417#     undef MULTI
418#  undef DIM_4d
419
420
421   !!----------------------------------------------------------------------
422   !!                   ***  routine mpp_lnk_bdy_(2,3,4)d  ***
423   !!
424   !!   * Argument : dummy argument use in mpp_lnk_... routines
425   !!                ptab   :   array or pointer of arrays on which the boundary condition is applied
426   !!                cd_nat :   nature of array grid-points
427   !!                psgn   :   sign used across the north fold boundary
428   !!                kb_bdy :   BDY boundary set
429   !!                kfld   :   optional, number of pt3d arrays
430   !!----------------------------------------------------------------------
431   !
432   !                       !==  2D array and array of 2D pointer  ==!
433   !
434#  define DIM_2d
435#     define ROUTINE_BDY           mpp_lnk_bdy_2d
436#     include "mpp_bdy_generic.h90"
437#     undef ROUTINE_BDY
438#  undef DIM_2d
439   !
440   !                       !==  3D array and array of 3D pointer  ==!
441   !
442#  define DIM_3d
443#     define ROUTINE_BDY           mpp_lnk_bdy_3d
444#     include "mpp_bdy_generic.h90"
445#     undef ROUTINE_BDY
446#  undef DIM_3d
447   !
448   !                       !==  4D array and array of 4D pointer  ==!
449   !
450#  define DIM_4d
451#     define ROUTINE_BDY           mpp_lnk_bdy_4d
452#     include "mpp_bdy_generic.h90"
453#     undef ROUTINE_BDY
454#  undef DIM_4d
455
456   !!----------------------------------------------------------------------
457   !!
458   !!   load_array  &   mpp_lnk_2d_9    à generaliser a 3D et 4D
459   
460   
461   !!    mpp_lnk_sum_2d et 3D   ====>>>>>>   à virer du code !!!!
462   
463   
464   !!----------------------------------------------------------------------
465
466
467
468   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
469      !!----------------------------------------------------------------------
470      !!                  ***  routine mppsend  ***
471      !!
472      !! ** Purpose :   Send messag passing array
473      !!
474      !!----------------------------------------------------------------------
475      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
476      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
477      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
478      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
479      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
480      !!
481      INTEGER ::   iflag
482      !!----------------------------------------------------------------------
483      !
484      SELECT CASE ( cn_mpi_send )
485      CASE ( 'S' )                ! Standard mpi send (blocking)
486         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
487      CASE ( 'B' )                ! Buffer mpi send (blocking)
488         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce        , iflag )
489      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
490         ! be carefull, one more argument here : the mpi request identifier..
491         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_oce, md_req, iflag )
492      END SELECT
493      !
494   END SUBROUTINE mppsend
495
496
497   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
498      !!----------------------------------------------------------------------
499      !!                  ***  routine mpprecv  ***
500      !!
501      !! ** Purpose :   Receive messag passing array
502      !!
503      !!----------------------------------------------------------------------
504      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
505      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
506      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
507      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
508      !!
509      INTEGER :: istatus(mpi_status_size)
510      INTEGER :: iflag
511      INTEGER :: use_source
512      !!----------------------------------------------------------------------
513      !
514      ! If a specific process number has been passed to the receive call,
515      ! use that one. Default is to use mpi_any_source
516      use_source = mpi_any_source
517      IF( PRESENT(ksource) )   use_source = ksource
518      !
519      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_oce, istatus, iflag )
520      !
521   END SUBROUTINE mpprecv
522
523
524   SUBROUTINE mppgather( ptab, kp, pio )
525      !!----------------------------------------------------------------------
526      !!                   ***  routine mppgather  ***
527      !!
528      !! ** Purpose :   Transfert between a local subdomain array and a work
529      !!     array which is distributed following the vertical level.
530      !!
531      !!----------------------------------------------------------------------
532      REAL(wp), DIMENSION(jpi,jpj)      , INTENT(in   ) ::   ptab   ! subdomain input array
533      INTEGER                           , INTENT(in   ) ::   kp     ! record length
534      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
535      !!
536      INTEGER :: itaille, ierror   ! temporary integer
537      !!---------------------------------------------------------------------
538      !
539      itaille = jpi * jpj
540      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
541         &                            mpi_double_precision, kp , mpi_comm_oce, ierror )
542      !
543   END SUBROUTINE mppgather
544
545
546   SUBROUTINE mppscatter( pio, kp, ptab )
547      !!----------------------------------------------------------------------
548      !!                  ***  routine mppscatter  ***
549      !!
550      !! ** Purpose :   Transfert between awork array which is distributed
551      !!      following the vertical level and the local subdomain array.
552      !!
553      !!----------------------------------------------------------------------
554      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::   pio    ! output array
555      INTEGER                             ::   kp     ! Tag (not used with MPI
556      REAL(wp), DIMENSION(jpi,jpj)        ::   ptab   ! subdomain array input
557      !!
558      INTEGER :: itaille, ierror   ! temporary integer
559      !!---------------------------------------------------------------------
560      !
561      itaille = jpi * jpj
562      !
563      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
564         &                            mpi_double_precision, kp  , mpi_comm_oce, ierror )
565      !
566   END SUBROUTINE mppscatter
567
568   !!
569   SUBROUTINE mpp_ilor( ld_switch, ldlast, kcom )
570      ! WARNING: must be used only once (by ice_dyn_adv_umx) because ll_switch and ireq are SAVE
571      !!----------------------------------------------------------------------
572      LOGICAL, INTENT(inout), DIMENSION(2) ::   ld_switch
573      LOGICAL, INTENT(in   ), OPTIONAL     ::   ldlast
574      INTEGER, INTENT(in   ), OPTIONAL     ::   kcom 
575      INTEGER  ::   ierror, ilocalcomm
576      LOGICAL, SAVE ::   ll_switch 
577      INTEGER, SAVE ::   ireq = -1
578      !!----------------------------------------------------------------------
579      ilocalcomm = mpi_comm_oce
580      IF( PRESENT(kcom) )   ilocalcomm = kcom
581     
582      IF ( ireq /= -1 ) THEN   ! get ld_switch(2) from ll_switch (from previous call)
583         IF( ln_timing ) CALL tic_tac(.TRUE., ld_global = .TRUE.)
584         CALL mpi_wait(ireq, MPI_STATUS_IGNORE, ierror )
585         ld_switch(2) = ll_switch
586         IF( ln_timing ) CALL tic_tac(.FALSE., ld_global = .TRUE.)
587      ENDIF
588      IF( .NOT. ldlast ) &     ! send ll_switch to be received on next call
589         CALL mpi_iallreduce( ld_switch(1), ll_switch, 1, MPI_LOGICAL, mpi_lor, ilocalcomm, ireq, ierror )
590
591   END SUBROUTINE mpp_ilor
592   
593   !!----------------------------------------------------------------------
594   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
595   !!   
596   !!----------------------------------------------------------------------
597   !!
598#  define OPERATION_MAX
599#  define INTEGER_TYPE
600#  define DIM_0d
601#     define ROUTINE_ALLREDUCE           mppmax_int
602#     include "mpp_allreduce_generic.h90"
603#     undef ROUTINE_ALLREDUCE
604#  undef DIM_0d
605#  define DIM_1d
606#     define ROUTINE_ALLREDUCE           mppmax_a_int
607#     include "mpp_allreduce_generic.h90"
608#     undef ROUTINE_ALLREDUCE
609#  undef DIM_1d
610#  undef INTEGER_TYPE
611!
612#  define REAL_TYPE
613#  define DIM_0d
614#     define ROUTINE_ALLREDUCE           mppmax_real
615#     include "mpp_allreduce_generic.h90"
616#     undef ROUTINE_ALLREDUCE
617#  undef DIM_0d
618#  define DIM_1d
619#     define ROUTINE_ALLREDUCE           mppmax_a_real
620#     include "mpp_allreduce_generic.h90"
621#     undef ROUTINE_ALLREDUCE
622#  undef DIM_1d
623#  undef REAL_TYPE
624#  undef OPERATION_MAX
625   !!----------------------------------------------------------------------
626   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
627   !!   
628   !!----------------------------------------------------------------------
629   !!
630#  define OPERATION_MIN
631#  define INTEGER_TYPE
632#  define DIM_0d
633#     define ROUTINE_ALLREDUCE           mppmin_int
634#     include "mpp_allreduce_generic.h90"
635#     undef ROUTINE_ALLREDUCE
636#  undef DIM_0d
637#  define DIM_1d
638#     define ROUTINE_ALLREDUCE           mppmin_a_int
639#     include "mpp_allreduce_generic.h90"
640#     undef ROUTINE_ALLREDUCE
641#  undef DIM_1d
642#  undef INTEGER_TYPE
643!
644#  define REAL_TYPE
645#  define DIM_0d
646#     define ROUTINE_ALLREDUCE           mppmin_real
647#     include "mpp_allreduce_generic.h90"
648#     undef ROUTINE_ALLREDUCE
649#  undef DIM_0d
650#  define DIM_1d
651#     define ROUTINE_ALLREDUCE           mppmin_a_real
652#     include "mpp_allreduce_generic.h90"
653#     undef ROUTINE_ALLREDUCE
654#  undef DIM_1d
655#  undef REAL_TYPE
656#  undef OPERATION_MIN
657
658   !!----------------------------------------------------------------------
659   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
660   !!   
661   !!   Global sum of 1D array or a variable (integer, real or complex)
662   !!----------------------------------------------------------------------
663   !!
664#  define OPERATION_SUM
665#  define INTEGER_TYPE
666#  define DIM_0d
667#     define ROUTINE_ALLREDUCE           mppsum_int
668#     include "mpp_allreduce_generic.h90"
669#     undef ROUTINE_ALLREDUCE
670#  undef DIM_0d
671#  define DIM_1d
672#     define ROUTINE_ALLREDUCE           mppsum_a_int
673#     include "mpp_allreduce_generic.h90"
674#     undef ROUTINE_ALLREDUCE
675#  undef DIM_1d
676#  undef INTEGER_TYPE
677!
678#  define REAL_TYPE
679#  define DIM_0d
680#     define ROUTINE_ALLREDUCE           mppsum_real
681#     include "mpp_allreduce_generic.h90"
682#     undef ROUTINE_ALLREDUCE
683#  undef DIM_0d
684#  define DIM_1d
685#     define ROUTINE_ALLREDUCE           mppsum_a_real
686#     include "mpp_allreduce_generic.h90"
687#     undef ROUTINE_ALLREDUCE
688#  undef DIM_1d
689#  undef REAL_TYPE
690#  undef OPERATION_SUM
691
692#  define OPERATION_SUM_DD
693#  define COMPLEX_TYPE
694#  define DIM_0d
695#     define ROUTINE_ALLREDUCE           mppsum_realdd
696#     include "mpp_allreduce_generic.h90"
697#     undef ROUTINE_ALLREDUCE
698#  undef DIM_0d
699#  define DIM_1d
700#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
701#     include "mpp_allreduce_generic.h90"
702#     undef ROUTINE_ALLREDUCE
703#  undef DIM_1d
704#  undef COMPLEX_TYPE
705#  undef OPERATION_SUM_DD
706
707   !!----------------------------------------------------------------------
708   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
709   !!   
710   !!----------------------------------------------------------------------
711   !!
712#  define OPERATION_MINLOC
713#  define DIM_2d
714#     define ROUTINE_LOC           mpp_minloc2d
715#     include "mpp_loc_generic.h90"
716#     undef ROUTINE_LOC
717#  undef DIM_2d
718#  define DIM_3d
719#     define ROUTINE_LOC           mpp_minloc3d
720#     include "mpp_loc_generic.h90"
721#     undef ROUTINE_LOC
722#  undef DIM_3d
723#  undef OPERATION_MINLOC
724
725#  define OPERATION_MAXLOC
726#  define DIM_2d
727#     define ROUTINE_LOC           mpp_maxloc2d
728#     include "mpp_loc_generic.h90"
729#     undef ROUTINE_LOC
730#  undef DIM_2d
731#  define DIM_3d
732#     define ROUTINE_LOC           mpp_maxloc3d
733#     include "mpp_loc_generic.h90"
734#     undef ROUTINE_LOC
735#  undef DIM_3d
736#  undef OPERATION_MAXLOC
737
738   SUBROUTINE mppsync()
739      !!----------------------------------------------------------------------
740      !!                  ***  routine mppsync  ***
741      !!
742      !! ** Purpose :   Massively parallel processors, synchroneous
743      !!
744      !!-----------------------------------------------------------------------
745      INTEGER :: ierror
746      !!-----------------------------------------------------------------------
747      !
748      CALL mpi_barrier( mpi_comm_oce, ierror )
749      !
750   END SUBROUTINE mppsync
751
752
753   SUBROUTINE mppstop( ldfinal ) 
754      !!----------------------------------------------------------------------
755      !!                  ***  routine mppstop  ***
756      !!
757      !! ** purpose :   Stop massively parallel processors method
758      !!
759      !!----------------------------------------------------------------------
760      LOGICAL, OPTIONAL, INTENT(in) :: ldfinal    ! source process number
761      LOGICAL ::   llfinal
762      INTEGER ::   info
763      !!----------------------------------------------------------------------
764      !
765      CALL mppsync
766      CALL mpi_finalize( info )
767      llfinal = .FALSE.
768      IF( PRESENT(ldfinal) ) llfinal = ldfinal
769      IF( .NOT. llfinal ) STOP 123456
770      !
771   END SUBROUTINE mppstop
772
773
774   SUBROUTINE mpp_comm_free( kcom )
775      !!----------------------------------------------------------------------
776      INTEGER, INTENT(in) ::   kcom
777      !!
778      INTEGER :: ierr
779      !!----------------------------------------------------------------------
780      !
781      CALL MPI_COMM_FREE(kcom, ierr)
782      !
783   END SUBROUTINE mpp_comm_free
784
785
786   SUBROUTINE mpp_ini_znl( kumout )
787      !!----------------------------------------------------------------------
788      !!               ***  routine mpp_ini_znl  ***
789      !!
790      !! ** Purpose :   Initialize special communicator for computing zonal sum
791      !!
792      !! ** Method  : - Look for processors in the same row
793      !!              - Put their number in nrank_znl
794      !!              - Create group for the znl processors
795      !!              - Create a communicator for znl processors
796      !!              - Determine if processor should write znl files
797      !!
798      !! ** output
799      !!      ndim_rank_znl = number of processors on the same row
800      !!      ngrp_znl = group ID for the znl processors
801      !!      ncomm_znl = communicator for the ice procs.
802      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
803      !!
804      !!----------------------------------------------------------------------
805      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
806      !
807      INTEGER :: jproc      ! dummy loop integer
808      INTEGER :: ierr, ii   ! local integer
809      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
810      !!----------------------------------------------------------------------
811      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
812      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
813      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_oce   : ', mpi_comm_oce
814      !
815      ALLOCATE( kwork(jpnij), STAT=ierr )
816      IF( ierr /= 0 ) THEN
817         WRITE(kumout, cform_err)
818         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
819         CALL mppstop
820      ENDIF
821
822      IF( jpnj == 1 ) THEN
823         ngrp_znl  = ngrp_world
824         ncomm_znl = mpi_comm_oce
825      ELSE
826         !
827         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_oce, ierr )
828         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
829         !-$$        CALL flush(numout)
830         !
831         ! Count number of processors on the same row
832         ndim_rank_znl = 0
833         DO jproc=1,jpnij
834            IF ( kwork(jproc) == njmpp ) THEN
835               ndim_rank_znl = ndim_rank_znl + 1
836            ENDIF
837         END DO
838         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
839         !-$$        CALL flush(numout)
840         ! Allocate the right size to nrank_znl
841         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
842         ALLOCATE(nrank_znl(ndim_rank_znl))
843         ii = 0
844         nrank_znl (:) = 0
845         DO jproc=1,jpnij
846            IF ( kwork(jproc) == njmpp) THEN
847               ii = ii + 1
848               nrank_znl(ii) = jproc -1
849            ENDIF
850         END DO
851         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
852         !-$$        CALL flush(numout)
853
854         ! Create the opa group
855         CALL MPI_COMM_GROUP(mpi_comm_oce,ngrp_opa,ierr)
856         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
857         !-$$        CALL flush(numout)
858
859         ! Create the znl group from the opa group
860         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
861         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
862         !-$$        CALL flush(numout)
863
864         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
865         CALL MPI_COMM_CREATE ( mpi_comm_oce, ngrp_znl, ncomm_znl, ierr )
866         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
867         !-$$        CALL flush(numout)
868         !
869      END IF
870
871      ! Determines if processor if the first (starting from i=1) on the row
872      IF ( jpni == 1 ) THEN
873         l_znl_root = .TRUE.
874      ELSE
875         l_znl_root = .FALSE.
876         kwork (1) = nimpp
877         CALL mpp_min ( 'lib_mpp', kwork(1), kcom = ncomm_znl)
878         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
879      END IF
880
881      DEALLOCATE(kwork)
882
883   END SUBROUTINE mpp_ini_znl
884
885
886   SUBROUTINE mpp_ini_north
887      !!----------------------------------------------------------------------
888      !!               ***  routine mpp_ini_north  ***
889      !!
890      !! ** Purpose :   Initialize special communicator for north folding
891      !!      condition together with global variables needed in the mpp folding
892      !!
893      !! ** Method  : - Look for northern processors
894      !!              - Put their number in nrank_north
895      !!              - Create groups for the world processors and the north processors
896      !!              - Create a communicator for northern processors
897      !!
898      !! ** output
899      !!      njmppmax = njmpp for northern procs
900      !!      ndim_rank_north = number of processors in the northern line
901      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
902      !!      ngrp_world = group ID for the world processors
903      !!      ngrp_north = group ID for the northern processors
904      !!      ncomm_north = communicator for the northern procs.
905      !!      north_root = number (in the world) of proc 0 in the northern comm.
906      !!
907      !!----------------------------------------------------------------------
908      INTEGER ::   ierr
909      INTEGER ::   jjproc
910      INTEGER ::   ii, ji
911      !!----------------------------------------------------------------------
912      !
913      njmppmax = MAXVAL( njmppt )
914      !
915      ! Look for how many procs on the northern boundary
916      ndim_rank_north = 0
917      DO jjproc = 1, jpnij
918         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
919      END DO
920      !
921      ! Allocate the right size to nrank_north
922      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
923      ALLOCATE( nrank_north(ndim_rank_north) )
924
925      ! Fill the nrank_north array with proc. number of northern procs.
926      ! Note : the rank start at 0 in MPI
927      ii = 0
928      DO ji = 1, jpnij
929         IF ( njmppt(ji) == njmppmax   ) THEN
930            ii=ii+1
931            nrank_north(ii)=ji-1
932         END IF
933      END DO
934      !
935      ! create the world group
936      CALL MPI_COMM_GROUP( mpi_comm_oce, ngrp_world, ierr )
937      !
938      ! Create the North group from the world group
939      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
940      !
941      ! Create the North communicator , ie the pool of procs in the north group
942      CALL MPI_COMM_CREATE( mpi_comm_oce, ngrp_north, ncomm_north, ierr )
943      !
944   END SUBROUTINE mpp_ini_north
945
946
947   SUBROUTINE mpi_init_oce( ldtxt, ksft, code )
948      !!---------------------------------------------------------------------
949      !!                   ***  routine mpp_init.opa  ***
950      !!
951      !! ** Purpose :: export and attach a MPI buffer for bsend
952      !!
953      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
954      !!            but classical mpi_init
955      !!
956      !! History :: 01/11 :: IDRIS initial version for IBM only
957      !!            08/04 :: R. Benshila, generalisation
958      !!---------------------------------------------------------------------
959      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
960      INTEGER                      , INTENT(inout) ::   ksft
961      INTEGER                      , INTENT(  out) ::   code
962      INTEGER                                      ::   ierr, ji
963      LOGICAL                                      ::   mpi_was_called
964      !!---------------------------------------------------------------------
965      !
966      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
967      IF ( code /= MPI_SUCCESS ) THEN
968         DO ji = 1, SIZE(ldtxt)
969            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
970         END DO
971         WRITE(*, cform_err)
972         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
973         CALL mpi_abort( mpi_comm_world, code, ierr )
974      ENDIF
975      !
976      IF( .NOT. mpi_was_called ) THEN
977         CALL mpi_init( code )
978         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_oce, code )
979         IF ( code /= MPI_SUCCESS ) THEN
980            DO ji = 1, SIZE(ldtxt)
981               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
982            END DO
983            WRITE(*, cform_err)
984            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
985            CALL mpi_abort( mpi_comm_world, code, ierr )
986         ENDIF
987      ENDIF
988      !
989      IF( nn_buffer > 0 ) THEN
990         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
991         ! Buffer allocation and attachment
992         ALLOCATE( tampon(nn_buffer), stat = ierr )
993         IF( ierr /= 0 ) THEN
994            DO ji = 1, SIZE(ldtxt)
995               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
996            END DO
997            WRITE(*, cform_err)
998            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
999            CALL mpi_abort( mpi_comm_world, code, ierr )
1000         END IF
1001         CALL mpi_buffer_attach( tampon, nn_buffer, code )
1002      ENDIF
1003      !
1004   END SUBROUTINE mpi_init_oce
1005
1006
1007   SUBROUTINE DDPDD_MPI( ydda, yddb, ilen, itype )
1008      !!---------------------------------------------------------------------
1009      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
1010      !!
1011      !!   Modification of original codes written by David H. Bailey
1012      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
1013      !!---------------------------------------------------------------------
1014      INTEGER                     , INTENT(in)    ::   ilen, itype
1015      COMPLEX(wp), DIMENSION(ilen), INTENT(in)    ::   ydda
1016      COMPLEX(wp), DIMENSION(ilen), INTENT(inout) ::   yddb
1017      !
1018      REAL(wp) :: zerr, zt1, zt2    ! local work variables
1019      INTEGER  :: ji, ztmp           ! local scalar
1020      !!---------------------------------------------------------------------
1021      !
1022      ztmp = itype   ! avoid compilation warning
1023      !
1024      DO ji=1,ilen
1025      ! Compute ydda + yddb using Knuth's trick.
1026         zt1  = real(ydda(ji)) + real(yddb(ji))
1027         zerr = zt1 - real(ydda(ji))
1028         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
1029                + aimag(ydda(ji)) + aimag(yddb(ji))
1030
1031         ! The result is zt1 + zt2, after normalization.
1032         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
1033      END DO
1034      !
1035   END SUBROUTINE DDPDD_MPI
1036
1037
1038   SUBROUTINE mpp_lbc_north_icb( pt2d, cd_type, psgn, kextj)
1039      !!---------------------------------------------------------------------
1040      !!                   ***  routine mpp_lbc_north_icb  ***
1041      !!
1042      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1043      !!              in mpp configuration in case of jpn1 > 1 and for 2d
1044      !!              array with outer extra halo
1045      !!
1046      !! ** Method  :   North fold condition and mpp with more than one proc
1047      !!              in i-direction require a specific treatment. We gather
1048      !!              the 4+kextj northern lines of the global domain on 1
1049      !!              processor and apply lbc north-fold on this sub array.
1050      !!              Then we scatter the north fold array back to the processors.
1051      !!              This routine accounts for an extra halo with icebergs
1052      !!              and assumes ghost rows and columns have been suppressed.
1053      !!
1054      !!----------------------------------------------------------------------
1055      REAL(wp), DIMENSION(:,:), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1056      CHARACTER(len=1)        , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
1057      !                                                     !   = T ,  U , V , F or W -points
1058      REAL(wp)                , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
1059      !!                                                    ! north fold, =  1. otherwise
1060      INTEGER                 , INTENT(in   ) ::   kextj    ! Extra halo width at north fold
1061      !
1062      INTEGER ::   ji, jj, jr
1063      INTEGER ::   ierr, itaille, ildi, ilei, iilb
1064      INTEGER ::   ipj, ij, iproc
1065      !
1066      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
1067      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
1068      !!----------------------------------------------------------------------
1069      !
1070      ipj=4
1071      ALLOCATE(        ztab_e(jpiglo, 1-kextj:ipj+kextj)       ,       &
1072     &            znorthloc_e(jpimax, 1-kextj:ipj+kextj)       ,       &
1073     &          znorthgloio_e(jpimax, 1-kextj:ipj+kextj,jpni)    )
1074      !
1075      ztab_e(:,:)      = 0._wp
1076      znorthloc_e(:,:) = 0._wp
1077      !
1078      ij = 1 - kextj
1079      ! put the last ipj+2*kextj lines of pt2d into znorthloc_e
1080      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1081         znorthloc_e(1:jpi,ij)=pt2d(1:jpi,jj)
1082         ij = ij + 1
1083      END DO
1084      !
1085      itaille = jpimax * ( ipj + 2*kextj )
1086      !
1087      IF( ln_timing ) CALL tic_tac(.TRUE.)
1088      CALL MPI_ALLGATHER( znorthloc_e(1,1-kextj)    , itaille, MPI_DOUBLE_PRECISION,    &
1089         &                znorthgloio_e(1,1-kextj,1), itaille, MPI_DOUBLE_PRECISION,    &
1090         &                ncomm_north, ierr )
1091      !
1092      IF( ln_timing ) CALL tic_tac(.FALSE.)
1093      !
1094      DO jr = 1, ndim_rank_north            ! recover the global north array
1095         iproc = nrank_north(jr) + 1
1096         ildi = nldit (iproc)
1097         ilei = nleit (iproc)
1098         iilb = nimppt(iproc)
1099         DO jj = 1-kextj, ipj+kextj
1100            DO ji = ildi, ilei
1101               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
1102            END DO
1103         END DO
1104      END DO
1105
1106      ! 2. North-Fold boundary conditions
1107      ! ----------------------------------
1108      CALL lbc_nfd( ztab_e(:,1-kextj:ipj+kextj), cd_type, psgn, kextj )
1109
1110      ij = 1 - kextj
1111      !! Scatter back to pt2d
1112      DO jj = jpj - ipj + 1 - kextj , jpj + kextj
1113         DO ji= 1, jpi
1114            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
1115         END DO
1116         ij  = ij +1
1117      END DO
1118      !
1119      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
1120      !
1121   END SUBROUTINE mpp_lbc_north_icb
1122
1123
1124   SUBROUTINE mpp_lnk_2d_icb( cdname, pt2d, cd_type, psgn, kexti, kextj )
1125      !!----------------------------------------------------------------------
1126      !!                  ***  routine mpp_lnk_2d_icb  ***
1127      !!
1128      !! ** Purpose :   Message passing management for 2d array (with extra halo for icebergs)
1129      !!                This routine receives a (1-kexti:jpi+kexti,1-kexti:jpj+kextj)
1130      !!                array (usually (0:jpi+1, 0:jpj+1)) from lbc_lnk_icb calls.
1131      !!
1132      !! ** Method  :   Use mppsend and mpprecv function for passing mask
1133      !!      between processors following neighboring subdomains.
1134      !!            domain parameters
1135      !!                    jpi    : first dimension of the local subdomain
1136      !!                    jpj    : second dimension of the local subdomain
1137      !!                    kexti  : number of columns for extra outer halo
1138      !!                    kextj  : number of rows for extra outer halo
1139      !!                    nbondi : mark for "east-west local boundary"
1140      !!                    nbondj : mark for "north-south local boundary"
1141      !!                    noea   : number for local neighboring processors
1142      !!                    nowe   : number for local neighboring processors
1143      !!                    noso   : number for local neighboring processors
1144      !!                    nono   : number for local neighboring processors
1145      !!----------------------------------------------------------------------
1146      CHARACTER(len=*)                                        , INTENT(in   ) ::   cdname      ! name of the calling subroutine
1147      REAL(wp), DIMENSION(1-kexti:jpi+kexti,1-kextj:jpj+kextj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
1148      CHARACTER(len=1)                                        , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
1149      REAL(wp)                                                , INTENT(in   ) ::   psgn     ! sign used across the north fold
1150      INTEGER                                                 , INTENT(in   ) ::   kexti    ! extra i-halo width
1151      INTEGER                                                 , INTENT(in   ) ::   kextj    ! extra j-halo width
1152      !
1153      INTEGER  ::   jl   ! dummy loop indices
1154      INTEGER  ::   imigr, iihom, ijhom        ! local integers
1155      INTEGER  ::   ipreci, iprecj             !   -       -
1156      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
1157      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
1158      !!
1159      REAL(wp), DIMENSION(1-kexti:jpi+kexti,nn_hls+kextj,2) ::   r2dns, r2dsn
1160      REAL(wp), DIMENSION(1-kextj:jpj+kextj,nn_hls+kexti,2) ::   r2dwe, r2dew
1161      !!----------------------------------------------------------------------
1162
1163      ipreci = nn_hls + kexti      ! take into account outer extra 2D overlap area
1164      iprecj = nn_hls + kextj
1165
1166      IF( narea == 1 .AND. numcom == -1 ) CALL mpp_report( cdname, 1, 1, 1, ld_lbc = .TRUE. )
1167
1168      ! 1. standard boundary treatment
1169      ! ------------------------------
1170      ! Order matters Here !!!!
1171      !
1172      !                                      ! East-West boundaries
1173      !                                           !* Cyclic east-west
1174      IF( l_Iperio ) THEN
1175         pt2d(1-kexti:     1   ,:) = pt2d(jpim1-kexti: jpim1 ,:)       ! east
1176         pt2d(  jpi  :jpi+kexti,:) = pt2d(     2     :2+kexti,:)       ! west
1177         !
1178      ELSE                                        !* closed
1179         IF( .NOT. cd_type == 'F' )   pt2d(  1-kexti   :nn_hls   ,:) = 0._wp    ! east except at F-point
1180                                      pt2d(jpi-nn_hls+1:jpi+kexti,:) = 0._wp    ! west
1181      ENDIF
1182      !                                      ! North-South boundaries
1183      IF( l_Jperio ) THEN                         !* cyclic (only with no mpp j-split)
1184         pt2d(:,1-kextj:     1   ) = pt2d(:,jpjm1-kextj:  jpjm1)       ! north
1185         pt2d(:,  jpj  :jpj+kextj) = pt2d(:,     2     :2+kextj)       ! south
1186      ELSE                                        !* closed
1187         IF( .NOT. cd_type == 'F' )   pt2d(:,  1-kextj   :nn_hls   ) = 0._wp    ! north except at F-point
1188                                      pt2d(:,jpj-nn_hls+1:jpj+kextj) = 0._wp    ! south
1189      ENDIF
1190      !
1191
1192      ! north fold treatment
1193      ! -----------------------
1194      IF( npolj /= 0 ) THEN
1195         !
1196         SELECT CASE ( jpni )
1197                   CASE ( 1 )     ;   CALL lbc_nfd          ( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1198                   CASE DEFAULT   ;   CALL mpp_lbc_north_icb( pt2d(1:jpi,1:jpj+kextj), cd_type, psgn, kextj )
1199         END SELECT
1200         !
1201      ENDIF
1202
1203      ! 2. East and west directions exchange
1204      ! ------------------------------------
1205      ! we play with the neigbours AND the row number because of the periodicity
1206      !
1207      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
1208      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
1209         iihom = jpi-nreci-kexti
1210         DO jl = 1, ipreci
1211            r2dew(:,jl,1) = pt2d(nn_hls+jl,:)
1212            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1213         END DO
1214      END SELECT
1215      !
1216      !                           ! Migrations
1217      imigr = ipreci * ( jpj + 2*kextj )
1218      !
1219      IF( ln_timing ) CALL tic_tac(.TRUE.)
1220      !
1221      SELECT CASE ( nbondi )
1222      CASE ( -1 )
1223         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req1 )
1224         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1225         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1226      CASE ( 0 )
1227         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1228         CALL mppsend( 2, r2dwe(1-kextj,1,1), imigr, noea, ml_req2 )
1229         CALL mpprecv( 1, r2dew(1-kextj,1,2), imigr, noea )
1230         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1231         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1232         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1233      CASE ( 1 )
1234         CALL mppsend( 1, r2dew(1-kextj,1,1), imigr, nowe, ml_req1 )
1235         CALL mpprecv( 2, r2dwe(1-kextj,1,2), imigr, nowe )
1236         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1237      END SELECT
1238      !
1239      IF( ln_timing ) CALL tic_tac(.FALSE.)
1240      !
1241      !                           ! Write Dirichlet lateral conditions
1242      iihom = jpi - nn_hls
1243      !
1244      SELECT CASE ( nbondi )
1245      CASE ( -1 )
1246         DO jl = 1, ipreci
1247            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1248         END DO
1249      CASE ( 0 )
1250         DO jl = 1, ipreci
1251            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1252            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1253         END DO
1254      CASE ( 1 )
1255         DO jl = 1, ipreci
1256            pt2d(jl-kexti,:) = r2dwe(:,jl,2)
1257         END DO
1258      END SELECT
1259
1260
1261      ! 3. North and south directions
1262      ! -----------------------------
1263      ! always closed : we play only with the neigbours
1264      !
1265      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1266         ijhom = jpj-nrecj-kextj
1267         DO jl = 1, iprecj
1268            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1269            r2dns(:,jl,1) = pt2d(:,nn_hls+jl)
1270         END DO
1271      ENDIF
1272      !
1273      !                           ! Migrations
1274      imigr = iprecj * ( jpi + 2*kexti )
1275      !
1276      IF( ln_timing ) CALL tic_tac(.TRUE.)
1277      !
1278      SELECT CASE ( nbondj )
1279      CASE ( -1 )
1280         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req1 )
1281         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1282         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1283      CASE ( 0 )
1284         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1285         CALL mppsend( 4, r2dsn(1-kexti,1,1), imigr, nono, ml_req2 )
1286         CALL mpprecv( 3, r2dns(1-kexti,1,2), imigr, nono )
1287         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1288         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1289         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1290      CASE ( 1 )
1291         CALL mppsend( 3, r2dns(1-kexti,1,1), imigr, noso, ml_req1 )
1292         CALL mpprecv( 4, r2dsn(1-kexti,1,2), imigr, noso )
1293         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1294      END SELECT
1295      !
1296      IF( ln_timing ) CALL tic_tac(.FALSE.)
1297      !
1298      !                           ! Write Dirichlet lateral conditions
1299      ijhom = jpj - nn_hls
1300      !
1301      SELECT CASE ( nbondj )
1302      CASE ( -1 )
1303         DO jl = 1, iprecj
1304            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1305         END DO
1306      CASE ( 0 )
1307         DO jl = 1, iprecj
1308            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1309            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1310         END DO
1311      CASE ( 1 )
1312         DO jl = 1, iprecj
1313            pt2d(:,jl-kextj) = r2dsn(:,jl,2)
1314         END DO
1315      END SELECT
1316      !
1317   END SUBROUTINE mpp_lnk_2d_icb
1318
1319
1320   SUBROUTINE mpp_report( cdname, kpk, kpl, kpf, ld_lbc, ld_glb )
1321      !!----------------------------------------------------------------------
1322      !!                  ***  routine mpp_report  ***
1323      !!
1324      !! ** Purpose :   report use of mpp routines per time-setp
1325      !!
1326      !!----------------------------------------------------------------------
1327      CHARACTER(len=*),           INTENT(in   ) ::   cdname      ! name of the calling subroutine
1328      INTEGER         , OPTIONAL, INTENT(in   ) ::   kpk, kpl, kpf
1329      LOGICAL         , OPTIONAL, INTENT(in   ) ::   ld_lbc, ld_glb
1330      !!
1331      LOGICAL ::   ll_lbc, ll_glb
1332      INTEGER ::    ji,  jj,  jk,  jh, jf   ! dummy loop indices
1333      !!----------------------------------------------------------------------
1334      !
1335      ll_lbc = .FALSE.
1336      IF( PRESENT(ld_lbc) ) ll_lbc = ld_lbc
1337      ll_glb = .FALSE.
1338      IF( PRESENT(ld_glb) ) ll_glb = ld_glb
1339      !
1340      ! find the smallest common frequency: default = frequency product, if multiple, choose the larger of the 2 frequency
1341      ncom_freq = ncom_fsbc * ncom_dttrc
1342      IF( MOD( MAX(ncom_fsbc,ncom_dttrc), MIN(ncom_fsbc,ncom_dttrc) ) == 0 ) ncom_freq = MAX(ncom_fsbc,ncom_dttrc)
1343      !
1344      IF ( ncom_stp == nit000+ncom_freq ) THEN   ! avoid to count extra communications in potential initializations at nit000
1345         IF( ll_lbc ) THEN
1346            IF( .NOT. ALLOCATED(ncomm_sequence) ) ALLOCATE( ncomm_sequence(ncom_rec_max,2) )
1347            IF( .NOT. ALLOCATED(    crname_lbc) ) ALLOCATE(     crname_lbc(ncom_rec_max  ) )
1348            n_sequence_lbc = n_sequence_lbc + 1
1349            IF( n_sequence_lbc > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1350            crname_lbc(n_sequence_lbc) = cdname     ! keep the name of the calling routine
1351            ncomm_sequence(n_sequence_lbc,1) = kpk*kpl   ! size of 3rd and 4th dimensions
1352            ncomm_sequence(n_sequence_lbc,2) = kpf       ! number of arrays to be treated (multi)
1353         ENDIF
1354         IF( ll_glb ) THEN
1355            IF( .NOT. ALLOCATED(crname_glb) ) ALLOCATE( crname_glb(ncom_rec_max) )
1356            n_sequence_glb = n_sequence_glb + 1
1357            IF( n_sequence_glb > ncom_rec_max ) CALL ctl_stop( 'STOP', 'lnk_generic, increase ncom_rec_max' )   ! deadlock
1358            crname_glb(n_sequence_glb) = cdname     ! keep the name of the calling routine
1359         ENDIF
1360      ELSE IF ( ncom_stp == nit000+2*ncom_freq ) THEN
1361         CALL ctl_opn( numcom, 'communication_report.txt', 'REPLACE', 'FORMATTED', 'SEQUENTIAL', -1, numout, .FALSE., narea )
1362         WRITE(numcom,*) ' '
1363         WRITE(numcom,*) ' ------------------------------------------------------------'
1364         WRITE(numcom,*) ' Communication pattern report (second oce+sbc+top time step):'
1365         WRITE(numcom,*) ' ------------------------------------------------------------'
1366         WRITE(numcom,*) ' '
1367         WRITE(numcom,'(A,I4)') ' Exchanged halos : ', n_sequence_lbc
1368         jj = 0; jk = 0; jf = 0; jh = 0
1369         DO ji = 1, n_sequence_lbc
1370            IF ( ncomm_sequence(ji,1) .GT. 1 ) jk = jk + 1
1371            IF ( ncomm_sequence(ji,2) .GT. 1 ) jf = jf + 1
1372            IF ( ncomm_sequence(ji,1) .GT. 1 .AND. ncomm_sequence(ji,2) .GT. 1 ) jj = jj + 1
1373            jh = MAX (jh, ncomm_sequence(ji,1)*ncomm_sequence(ji,2))
1374         END DO
1375         WRITE(numcom,'(A,I3)') ' 3D Exchanged halos : ', jk
1376         WRITE(numcom,'(A,I3)') ' Multi arrays exchanged halos : ', jf
1377         WRITE(numcom,'(A,I3)') '   from which 3D : ', jj
1378         WRITE(numcom,'(A,I10)') ' Array max size : ', jh*jpi*jpj
1379         WRITE(numcom,*) ' '
1380         WRITE(numcom,*) ' lbc_lnk called'
1381         jj = 1
1382         DO ji = 2, n_sequence_lbc
1383            IF( crname_lbc(ji-1) /= crname_lbc(ji) ) THEN
1384               WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(ji-1))
1385               jj = 0
1386            END IF
1387            jj = jj + 1 
1388         END DO
1389         WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_lbc(n_sequence_lbc))
1390         WRITE(numcom,*) ' '
1391         IF ( n_sequence_glb > 0 ) THEN
1392            WRITE(numcom,'(A,I4)') ' Global communications : ', n_sequence_glb
1393            jj = 1
1394            DO ji = 2, n_sequence_glb
1395               IF( crname_glb(ji-1) /= crname_glb(ji) ) THEN
1396                  WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(ji-1))
1397                  jj = 0
1398               END IF
1399               jj = jj + 1 
1400            END DO
1401            WRITE(numcom,'(A, I4, A, A)') ' - ', jj,' times by subroutine ', TRIM(crname_glb(n_sequence_glb))
1402            DEALLOCATE(crname_glb)
1403         ELSE
1404            WRITE(numcom,*) ' No MPI global communication '
1405         ENDIF
1406         WRITE(numcom,*) ' '
1407         WRITE(numcom,*) ' -----------------------------------------------'
1408         WRITE(numcom,*) ' '
1409         DEALLOCATE(ncomm_sequence)
1410         DEALLOCATE(crname_lbc)
1411      ENDIF
1412   END SUBROUTINE mpp_report
1413
1414   
1415   SUBROUTINE tic_tac (ld_tic, ld_global)
1416
1417    LOGICAL,           INTENT(IN) :: ld_tic
1418    LOGICAL, OPTIONAL, INTENT(IN) :: ld_global
1419    REAL(wp), DIMENSION(2), SAVE :: tic_wt
1420    REAL(wp),               SAVE :: tic_ct = 0._wp
1421    INTEGER :: ii
1422
1423    IF( ncom_stp <= nit000 ) RETURN
1424    IF( ncom_stp == nitend ) RETURN
1425    ii = 1
1426    IF( PRESENT( ld_global ) ) THEN
1427       IF( ld_global ) ii = 2
1428    END IF
1429   
1430    IF ( ld_tic ) THEN
1431       tic_wt(ii) = MPI_Wtime()                                                    ! start count tic->tac (waiting time)
1432       IF ( tic_ct > 0.0_wp ) compute_time = compute_time + MPI_Wtime() - tic_ct   ! cumulate count tac->tic
1433    ELSE
1434       waiting_time(ii) = waiting_time(ii) + MPI_Wtime() - tic_wt(ii)              ! cumulate count tic->tac
1435       tic_ct = MPI_Wtime()                                                        ! start count tac->tic (waiting time)
1436    ENDIF
1437   
1438   END SUBROUTINE tic_tac
1439
1440   
1441#else
1442   !!----------------------------------------------------------------------
1443   !!   Default case:            Dummy module        share memory computing
1444   !!----------------------------------------------------------------------
1445   USE in_out_manager
1446
1447   INTERFACE mpp_sum
1448      MODULE PROCEDURE mppsum_int, mppsum_a_int, mppsum_real, mppsum_a_real, mppsum_realdd, mppsum_a_realdd
1449   END INTERFACE
1450   INTERFACE mpp_max
1451      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
1452   END INTERFACE
1453   INTERFACE mpp_min
1454      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
1455   END INTERFACE
1456   INTERFACE mpp_minloc
1457      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
1458   END INTERFACE
1459   INTERFACE mpp_maxloc
1460      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
1461   END INTERFACE
1462
1463   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
1464   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
1465   INTEGER, PUBLIC            ::   mpi_comm_oce          ! opa local communicator
1466   !!----------------------------------------------------------------------
1467CONTAINS
1468
1469   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
1470      INTEGER, INTENT(in) ::   kumout
1471      lib_mpp_alloc = 0
1472   END FUNCTION lib_mpp_alloc
1473
1474   FUNCTION mynode( ldtxt, ldname, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
1475      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
1476      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
1477      CHARACTER(len=*) ::   ldname
1478      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
1479      IF( PRESENT( localComm ) ) mpi_comm_oce = localComm
1480      function_value = 0
1481      IF( .FALSE. )   ldtxt(:) = 'never done'
1482      CALL ctl_opn( kumond, TRIM(ldname), 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
1483   END FUNCTION mynode
1484
1485   SUBROUTINE mppsync                       ! Dummy routine
1486   END SUBROUTINE mppsync
1487
1488   !!----------------------------------------------------------------------
1489   !!    ***  mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real  ***
1490   !!   
1491   !!----------------------------------------------------------------------
1492   !!
1493#  define OPERATION_MAX
1494#  define INTEGER_TYPE
1495#  define DIM_0d
1496#     define ROUTINE_ALLREDUCE           mppmax_int
1497#     include "mpp_allreduce_generic.h90"
1498#     undef ROUTINE_ALLREDUCE
1499#  undef DIM_0d
1500#  define DIM_1d
1501#     define ROUTINE_ALLREDUCE           mppmax_a_int
1502#     include "mpp_allreduce_generic.h90"
1503#     undef ROUTINE_ALLREDUCE
1504#  undef DIM_1d
1505#  undef INTEGER_TYPE
1506!
1507#  define REAL_TYPE
1508#  define DIM_0d
1509#     define ROUTINE_ALLREDUCE           mppmax_real
1510#     include "mpp_allreduce_generic.h90"
1511#     undef ROUTINE_ALLREDUCE
1512#  undef DIM_0d
1513#  define DIM_1d
1514#     define ROUTINE_ALLREDUCE           mppmax_a_real
1515#     include "mpp_allreduce_generic.h90"
1516#     undef ROUTINE_ALLREDUCE
1517#  undef DIM_1d
1518#  undef REAL_TYPE
1519#  undef OPERATION_MAX
1520   !!----------------------------------------------------------------------
1521   !!    ***  mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real  ***
1522   !!   
1523   !!----------------------------------------------------------------------
1524   !!
1525#  define OPERATION_MIN
1526#  define INTEGER_TYPE
1527#  define DIM_0d
1528#     define ROUTINE_ALLREDUCE           mppmin_int
1529#     include "mpp_allreduce_generic.h90"
1530#     undef ROUTINE_ALLREDUCE
1531#  undef DIM_0d
1532#  define DIM_1d
1533#     define ROUTINE_ALLREDUCE           mppmin_a_int
1534#     include "mpp_allreduce_generic.h90"
1535#     undef ROUTINE_ALLREDUCE
1536#  undef DIM_1d
1537#  undef INTEGER_TYPE
1538!
1539#  define REAL_TYPE
1540#  define DIM_0d
1541#     define ROUTINE_ALLREDUCE           mppmin_real
1542#     include "mpp_allreduce_generic.h90"
1543#     undef ROUTINE_ALLREDUCE
1544#  undef DIM_0d
1545#  define DIM_1d
1546#     define ROUTINE_ALLREDUCE           mppmin_a_real
1547#     include "mpp_allreduce_generic.h90"
1548#     undef ROUTINE_ALLREDUCE
1549#  undef DIM_1d
1550#  undef REAL_TYPE
1551#  undef OPERATION_MIN
1552
1553   !!----------------------------------------------------------------------
1554   !!    ***  mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real  ***
1555   !!   
1556   !!   Global sum of 1D array or a variable (integer, real or complex)
1557   !!----------------------------------------------------------------------
1558   !!
1559#  define OPERATION_SUM
1560#  define INTEGER_TYPE
1561#  define DIM_0d
1562#     define ROUTINE_ALLREDUCE           mppsum_int
1563#     include "mpp_allreduce_generic.h90"
1564#     undef ROUTINE_ALLREDUCE
1565#  undef DIM_0d
1566#  define DIM_1d
1567#     define ROUTINE_ALLREDUCE           mppsum_a_int
1568#     include "mpp_allreduce_generic.h90"
1569#     undef ROUTINE_ALLREDUCE
1570#  undef DIM_1d
1571#  undef INTEGER_TYPE
1572!
1573#  define REAL_TYPE
1574#  define DIM_0d
1575#     define ROUTINE_ALLREDUCE           mppsum_real
1576#     include "mpp_allreduce_generic.h90"
1577#     undef ROUTINE_ALLREDUCE
1578#  undef DIM_0d
1579#  define DIM_1d
1580#     define ROUTINE_ALLREDUCE           mppsum_a_real
1581#     include "mpp_allreduce_generic.h90"
1582#     undef ROUTINE_ALLREDUCE
1583#  undef DIM_1d
1584#  undef REAL_TYPE
1585#  undef OPERATION_SUM
1586
1587#  define OPERATION_SUM_DD
1588#  define COMPLEX_TYPE
1589#  define DIM_0d
1590#     define ROUTINE_ALLREDUCE           mppsum_realdd
1591#     include "mpp_allreduce_generic.h90"
1592#     undef ROUTINE_ALLREDUCE
1593#  undef DIM_0d
1594#  define DIM_1d
1595#     define ROUTINE_ALLREDUCE           mppsum_a_realdd
1596#     include "mpp_allreduce_generic.h90"
1597#     undef ROUTINE_ALLREDUCE
1598#  undef DIM_1d
1599#  undef COMPLEX_TYPE
1600#  undef OPERATION_SUM_DD
1601
1602   !!----------------------------------------------------------------------
1603   !!    ***  mpp_minloc2d, mpp_minloc3d, mpp_maxloc2d, mpp_maxloc3d
1604   !!   
1605   !!----------------------------------------------------------------------
1606   !!
1607#  define OPERATION_MINLOC
1608#  define DIM_2d
1609#     define ROUTINE_LOC           mpp_minloc2d
1610#     include "mpp_loc_generic.h90"
1611#     undef ROUTINE_LOC
1612#  undef DIM_2d
1613#  define DIM_3d
1614#     define ROUTINE_LOC           mpp_minloc3d
1615#     include "mpp_loc_generic.h90"
1616#     undef ROUTINE_LOC
1617#  undef DIM_3d
1618#  undef OPERATION_MINLOC
1619
1620#  define OPERATION_MAXLOC
1621#  define DIM_2d
1622#     define ROUTINE_LOC           mpp_maxloc2d
1623#     include "mpp_loc_generic.h90"
1624#     undef ROUTINE_LOC
1625#  undef DIM_2d
1626#  define DIM_3d
1627#     define ROUTINE_LOC           mpp_maxloc3d
1628#     include "mpp_loc_generic.h90"
1629#     undef ROUTINE_LOC
1630#  undef DIM_3d
1631#  undef OPERATION_MAXLOC
1632
1633   SUBROUTINE mpp_ilor( ld_switch, ldlast, kcom )
1634      LOGICAL, INTENT(in   ), DIMENSION(2) ::   ld_switch
1635      LOGICAL, INTENT(in   ), OPTIONAL     ::   ldlast
1636      INTEGER, INTENT(in   ), OPTIONAL     ::   kcom    ! ???
1637      WRITE(*,*) 'mpp_ilor: You should not have seen this print! error?', ld_switch
1638   END SUBROUTINE mpp_ilor
1639
1640   SUBROUTINE mppstop
1641      STOP      ! non MPP case, just stop the run
1642   END SUBROUTINE mppstop
1643
1644   SUBROUTINE mpp_ini_znl( knum )
1645      INTEGER :: knum
1646      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
1647   END SUBROUTINE mpp_ini_znl
1648
1649   SUBROUTINE mpp_comm_free( kcom )
1650      INTEGER :: kcom
1651      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
1652   END SUBROUTINE mpp_comm_free
1653   
1654#endif
1655
1656   !!----------------------------------------------------------------------
1657   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
1658   !!----------------------------------------------------------------------
1659
1660   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
1661      &                 cd6, cd7, cd8, cd9, cd10 )
1662      !!----------------------------------------------------------------------
1663      !!                  ***  ROUTINE  stop_opa  ***
1664      !!
1665      !! ** Purpose :   print in ocean.outpput file a error message and
1666      !!                increment the error number (nstop) by one.
1667      !!----------------------------------------------------------------------
1668      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1669      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1670      !!----------------------------------------------------------------------
1671      !
1672      nstop = nstop + 1
1673      IF(lwp) THEN
1674         WRITE(numout,cform_err)
1675         IF( PRESENT(cd1 ) )   WRITE(numout,*) TRIM(cd1)
1676         IF( PRESENT(cd2 ) )   WRITE(numout,*) TRIM(cd2)
1677         IF( PRESENT(cd3 ) )   WRITE(numout,*) TRIM(cd3)
1678         IF( PRESENT(cd4 ) )   WRITE(numout,*) TRIM(cd4)
1679         IF( PRESENT(cd5 ) )   WRITE(numout,*) TRIM(cd5)
1680         IF( PRESENT(cd6 ) )   WRITE(numout,*) TRIM(cd6)
1681         IF( PRESENT(cd7 ) )   WRITE(numout,*) TRIM(cd7)
1682         IF( PRESENT(cd8 ) )   WRITE(numout,*) TRIM(cd8)
1683         IF( PRESENT(cd9 ) )   WRITE(numout,*) TRIM(cd9)
1684         IF( PRESENT(cd10) )   WRITE(numout,*) TRIM(cd10)
1685      ENDIF
1686                               CALL FLUSH(numout    )
1687      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
1688      IF( numrun     /= -1 )   CALL FLUSH(numrun    )
1689      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
1690      !
1691      IF( cd1 == 'STOP' ) THEN
1692         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
1693         CALL mppstop()
1694      ENDIF
1695      !
1696   END SUBROUTINE ctl_stop
1697
1698
1699   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
1700      &                 cd6, cd7, cd8, cd9, cd10 )
1701      !!----------------------------------------------------------------------
1702      !!                  ***  ROUTINE  stop_warn  ***
1703      !!
1704      !! ** Purpose :   print in ocean.outpput file a error message and
1705      !!                increment the warning number (nwarn) by one.
1706      !!----------------------------------------------------------------------
1707      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
1708      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
1709      !!----------------------------------------------------------------------
1710      !
1711      nwarn = nwarn + 1
1712      IF(lwp) THEN
1713         WRITE(numout,cform_war)
1714         IF( PRESENT(cd1 ) ) WRITE(numout,*) TRIM(cd1)
1715         IF( PRESENT(cd2 ) ) WRITE(numout,*) TRIM(cd2)
1716         IF( PRESENT(cd3 ) ) WRITE(numout,*) TRIM(cd3)
1717         IF( PRESENT(cd4 ) ) WRITE(numout,*) TRIM(cd4)
1718         IF( PRESENT(cd5 ) ) WRITE(numout,*) TRIM(cd5)
1719         IF( PRESENT(cd6 ) ) WRITE(numout,*) TRIM(cd6)
1720         IF( PRESENT(cd7 ) ) WRITE(numout,*) TRIM(cd7)
1721         IF( PRESENT(cd8 ) ) WRITE(numout,*) TRIM(cd8)
1722         IF( PRESENT(cd9 ) ) WRITE(numout,*) TRIM(cd9)
1723         IF( PRESENT(cd10) ) WRITE(numout,*) TRIM(cd10)
1724      ENDIF
1725      CALL FLUSH(numout)
1726      !
1727   END SUBROUTINE ctl_warn
1728
1729
1730   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
1731      !!----------------------------------------------------------------------
1732      !!                  ***  ROUTINE ctl_opn  ***
1733      !!
1734      !! ** Purpose :   Open file and check if required file is available.
1735      !!
1736      !! ** Method  :   Fortan open
1737      !!----------------------------------------------------------------------
1738      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
1739      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
1740      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
1741      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
1742      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
1743      INTEGER          , INTENT(in   ) ::   klengh    ! record length
1744      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
1745      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
1746      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
1747      !
1748      CHARACTER(len=80) ::   clfile
1749      INTEGER           ::   iost
1750      !!----------------------------------------------------------------------
1751      !
1752      ! adapt filename
1753      ! ----------------
1754      clfile = TRIM(cdfile)
1755      IF( PRESENT( karea ) ) THEN
1756         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
1757      ENDIF
1758#if defined key_agrif
1759      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
1760      knum=Agrif_Get_Unit()
1761#else
1762      knum=get_unit()
1763#endif
1764      IF( TRIM(cdfile) == '/dev/null' )   clfile = TRIM(cdfile)   ! force the use of /dev/null
1765      !
1766      iost=0
1767      IF( cdacce(1:6) == 'DIRECT' )  THEN
1768         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh, ERR=100, IOSTAT=iost )
1769      ELSE
1770         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )
1771      ENDIF
1772      IF( iost /= 0 .AND. TRIM(clfile) == '/dev/null' ) &
1773         &  OPEN(UNIT=knum,FILE='NUL', FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )   ! for windows
1774      IF( iost == 0 ) THEN
1775         IF(ldwp) THEN
1776            WRITE(kout,*) '     file   : ', clfile,' open ok'
1777            WRITE(kout,*) '     unit   = ', knum
1778            WRITE(kout,*) '     status = ', cdstat
1779            WRITE(kout,*) '     form   = ', cdform
1780            WRITE(kout,*) '     access = ', cdacce
1781            WRITE(kout,*)
1782         ENDIF
1783      ENDIF
1784100   CONTINUE
1785      IF( iost /= 0 ) THEN
1786         IF(ldwp) THEN
1787            WRITE(kout,*)
1788            WRITE(kout,*) ' ===>>>> : bad opening file: ', clfile
1789            WRITE(kout,*) ' =======   ===  '
1790            WRITE(kout,*) '           unit   = ', knum
1791            WRITE(kout,*) '           status = ', cdstat
1792            WRITE(kout,*) '           form   = ', cdform
1793            WRITE(kout,*) '           access = ', cdacce
1794            WRITE(kout,*) '           iostat = ', iost
1795            WRITE(kout,*) '           we stop. verify the file '
1796            WRITE(kout,*)
1797         ELSE  !!! Force writing to make sure we get the information - at least once - in this violent STOP!!
1798            WRITE(*,*)
1799            WRITE(*,*) ' ===>>>> : bad opening file: ', clfile
1800            WRITE(*,*) ' =======   ===  '
1801            WRITE(*,*) '           unit   = ', knum
1802            WRITE(*,*) '           status = ', cdstat
1803            WRITE(*,*) '           form   = ', cdform
1804            WRITE(*,*) '           access = ', cdacce
1805            WRITE(*,*) '           iostat = ', iost
1806            WRITE(*,*) '           we stop. verify the file '
1807            WRITE(*,*)
1808         ENDIF
1809         CALL FLUSH( kout ) 
1810         STOP 'ctl_opn bad opening'
1811      ENDIF
1812      !
1813   END SUBROUTINE ctl_opn
1814
1815
1816   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
1817      !!----------------------------------------------------------------------
1818      !!                  ***  ROUTINE ctl_nam  ***
1819      !!
1820      !! ** Purpose :   Informations when error while reading a namelist
1821      !!
1822      !! ** Method  :   Fortan open
1823      !!----------------------------------------------------------------------
1824      INTEGER         , INTENT(inout) ::   kios    ! IO status after reading the namelist
1825      CHARACTER(len=*), INTENT(in   ) ::   cdnam   ! group name of namelist for which error occurs
1826      CHARACTER(len=5)                ::   clios   ! string to convert iostat in character for print
1827      LOGICAL         , INTENT(in   ) ::   ldwp    ! boolean term for print
1828      !!----------------------------------------------------------------------
1829      !
1830      WRITE (clios, '(I5.0)')   kios
1831      IF( kios < 0 ) THEN         
1832         CALL ctl_warn( 'end of record or file while reading namelist '   &
1833            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1834      ENDIF
1835      !
1836      IF( kios > 0 ) THEN
1837         CALL ctl_stop( 'misspelled variable in namelist '   &
1838            &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
1839      ENDIF
1840      kios = 0
1841      RETURN
1842      !
1843   END SUBROUTINE ctl_nam
1844
1845
1846   INTEGER FUNCTION get_unit()
1847      !!----------------------------------------------------------------------
1848      !!                  ***  FUNCTION  get_unit  ***
1849      !!
1850      !! ** Purpose :   return the index of an unused logical unit
1851      !!----------------------------------------------------------------------
1852      LOGICAL :: llopn
1853      !!----------------------------------------------------------------------
1854      !
1855      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
1856      llopn = .TRUE.
1857      DO WHILE( (get_unit < 998) .AND. llopn )
1858         get_unit = get_unit + 1
1859         INQUIRE( unit = get_unit, opened = llopn )
1860      END DO
1861      IF( (get_unit == 999) .AND. llopn ) THEN
1862         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
1863         get_unit = -1
1864      ENDIF
1865      !
1866   END FUNCTION get_unit
1867
1868   !!----------------------------------------------------------------------
1869END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.