New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
lib_mpp.F90 in trunk/NEMOGCM/NEMO/OPA_SRC/LBC – NEMO

source: trunk/NEMOGCM/NEMO/OPA_SRC/LBC/lib_mpp.F90 @ 4645

Last change on this file since 4645 was 4645, checked in by epico, 10 years ago

bug fixes for the north-fold optimization. see ticket #1195

  • Property svn:keywords set to Id
File size: 137.0 KB
Line 
1MODULE lib_mpp
2   !!======================================================================
3   !!                       ***  MODULE  lib_mpp  ***
4   !! Ocean numerics:  massively parallel processing library
5   !!=====================================================================
6   !! History :  OPA  !  1994  (M. Guyon, J. Escobar, M. Imbard)  Original code
7   !!            7.0  !  1997  (A.M. Treguier)  SHMEM additions
8   !!            8.0  !  1998  (M. Imbard, J. Escobar, L. Colombet ) SHMEM and MPI
9   !!                 !  1998  (J.M. Molines) Open boundary conditions
10   !!   NEMO     1.0  !  2003  (J.-M. Molines, G. Madec)  F90, free form
11   !!                 !  2003  (J.M. Molines) add mpp_ini_north(_3d,_2d)
12   !!             -   !  2004  (R. Bourdalle Badie)  isend option in mpi
13   !!                 !  2004  (J.M. Molines) minloc, maxloc
14   !!             -   !  2005  (G. Madec, S. Masson)  npolj=5,6 F-point & ice cases
15   !!             -   !  2005  (R. Redler) Replacement of MPI_COMM_WORLD except for MPI_Abort
16   !!             -   !  2005  (R. Benshila, G. Madec)  add extra halo case
17   !!             -   !  2008  (R. Benshila) add mpp_ini_ice
18   !!            3.2  !  2009  (R. Benshila) SHMEM suppression, north fold in lbc_nfd
19   !!            3.2  !  2009  (O. Marti)    add mpp_ini_znl
20   !!            4.0  !  2011  (G. Madec)  move ctl_ routines from in_out_manager
21   !!            3.5  !  2012  (S.Mocavero, I. Epicoco) Add 'mpp_lnk_bdy_3d', 'mpp_lnk_obc_3d',
22   !!                          'mpp_lnk_bdy_2d' and 'mpp_lnk_obc_2d' routines and update
23   !!                          the mppobc routine to optimize the BDY and OBC communications
24   !!            3.5  !  2013  ( C. Ethe, G. Madec ) message passing arrays as local variables
25   !!            3.5  !  2013 (S.Mocavero, I.Epicoco - CMCC) north fold optimizations
26   !!----------------------------------------------------------------------
27
28   !!----------------------------------------------------------------------
29   !!   ctl_stop   : update momentum and tracer Kz from a tke scheme
30   !!   ctl_warn   : initialization, namelist read, and parameters control
31   !!   ctl_opn    : Open file and check if required file is available.
32   !!   ctl_nam    : Prints informations when an error occurs while reading a namelist
33   !!   get_unit   : give the index of an unused logical unit
34   !!----------------------------------------------------------------------
35#if   defined key_mpp_mpi
36   !!----------------------------------------------------------------------
37   !!   'key_mpp_mpi'             MPI massively parallel processing library
38   !!----------------------------------------------------------------------
39   !!   lib_mpp_alloc : allocate mpp arrays
40   !!   mynode        : indentify the processor unit
41   !!   mpp_lnk       : interface (defined in lbclnk) for message passing of 2d or 3d arrays (mpp_lnk_2d, mpp_lnk_3d)
42   !!   mpp_lnk_3d_gather :  Message passing manadgement for two 3D arrays
43   !!   mpp_lnk_e     : interface (defined in lbclnk) for message passing of 2d array with extra halo (mpp_lnk_2d_e)
44   !!   mpprecv         :
45   !!   mppsend       :   SUBROUTINE mpp_ini_znl
46   !!   mppscatter    :
47   !!   mppgather     :
48   !!   mpp_min       : generic interface for mppmin_int , mppmin_a_int , mppmin_real, mppmin_a_real
49   !!   mpp_max       : generic interface for mppmax_int , mppmax_a_int , mppmax_real, mppmax_a_real
50   !!   mpp_sum       : generic interface for mppsum_int , mppsum_a_int , mppsum_real, mppsum_a_real
51   !!   mpp_minloc    :
52   !!   mpp_maxloc    :
53   !!   mppsync       :
54   !!   mppstop       :
55   !!   mpp_ini_north : initialisation of north fold
56   !!   mpp_lbc_north : north fold processors gathering
57   !!   mpp_lbc_north_e : variant of mpp_lbc_north for extra outer halo
58   !!----------------------------------------------------------------------
59   USE dom_oce        ! ocean space and time domain
60   USE lbcnfd         ! north fold treatment
61   USE in_out_manager ! I/O manager
62
63   IMPLICIT NONE
64   PRIVATE
65   
66   PUBLIC   ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam
67   PUBLIC   mynode, mppstop, mppsync, mpp_comm_free
68   PUBLIC   mpp_ini_north, mpp_lbc_north, mpp_lbc_north_e
69   PUBLIC   mpp_min, mpp_max, mpp_sum, mpp_minloc, mpp_maxloc
70   PUBLIC   mpp_lnk_3d, mpp_lnk_3d_gather, mpp_lnk_2d, mpp_lnk_2d_e
71   PUBLIC   mppscatter, mppgather
72   PUBLIC   mpp_ini_ice, mpp_ini_znl
73   PUBLIC   mppsize
74   PUBLIC   mppsend, mpprecv                          ! needed by TAM and ICB routines
75   PUBLIC   mpp_lnk_bdy_2d, mpp_lnk_bdy_3d
76
77   !! * Interfaces
78   !! define generic interface for these routine as they are called sometimes
79   !! with scalar arguments instead of array arguments, which causes problems
80   !! for the compilation on AIX system as well as NEC and SGI. Ok on COMPACQ
81   INTERFACE mpp_min
82      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
83   END INTERFACE
84   INTERFACE mpp_max
85      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
86   END INTERFACE
87   INTERFACE mpp_sum
88      MODULE PROCEDURE mppsum_a_int, mppsum_int, mppsum_a_real, mppsum_real, &
89                       mppsum_realdd, mppsum_a_realdd
90   END INTERFACE
91   INTERFACE mpp_lbc_north
92      MODULE PROCEDURE mpp_lbc_north_3d, mpp_lbc_north_2d
93   END INTERFACE
94   INTERFACE mpp_minloc
95      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
96   END INTERFACE
97   INTERFACE mpp_maxloc
98      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
99   END INTERFACE
100
101   !! ========================= !!
102   !!  MPI  variable definition !!
103   !! ========================= !!
104!$AGRIF_DO_NOT_TREAT
105   INCLUDE 'mpif.h'
106!$AGRIF_END_DO_NOT_TREAT
107
108   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .TRUE.    !: mpp flag
109
110   INTEGER, PARAMETER         ::   nprocmax = 2**10   ! maximun dimension (required to be a power of 2)
111
112   INTEGER ::   mppsize        ! number of process
113   INTEGER ::   mpprank        ! process number  [ 0 - size-1 ]
114!$AGRIF_DO_NOT_TREAT
115   INTEGER, PUBLIC ::   mpi_comm_opa   ! opa local communicator
116!$AGRIF_END_DO_NOT_TREAT
117
118   INTEGER :: MPI_SUMDD
119
120   ! variables used in case of sea-ice
121   INTEGER, PUBLIC ::   ncomm_ice       !: communicator made by the processors with sea-ice (public so that it can be freed in limthd)
122   INTEGER ::   ngrp_iworld     !  group ID for the world processors (for rheology)
123   INTEGER ::   ngrp_ice        !  group ID for the ice processors (for rheology)
124   INTEGER ::   ndim_rank_ice   !  number of 'ice' processors
125   INTEGER ::   n_ice_root      !  number (in the comm_ice) of proc 0 in the ice comm
126   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_ice     ! dimension ndim_rank_ice
127
128   ! variables used for zonal integration
129   INTEGER, PUBLIC ::   ncomm_znl       !: communicator made by the processors on the same zonal average
130   LOGICAL, PUBLIC ::   l_znl_root      ! True on the 'left'most processor on the same row
131   INTEGER ::   ngrp_znl        ! group ID for the znl processors
132   INTEGER ::   ndim_rank_znl   ! number of processors on the same zonal average
133   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE ::   nrank_znl  ! dimension ndim_rank_znl, number of the procs into the same znl domain
134
135   ! North fold condition in mpp_mpi with jpni > 1 (PUBLIC for TAM)
136   INTEGER, PUBLIC ::   ngrp_world        ! group ID for the world processors
137   INTEGER, PUBLIC ::   ngrp_opa          ! group ID for the opa processors
138   INTEGER, PUBLIC ::   ngrp_north        ! group ID for the northern processors (to be fold)
139   INTEGER, PUBLIC ::   ncomm_north       ! communicator made by the processors belonging to ngrp_north
140   INTEGER, PUBLIC ::   ndim_rank_north   ! number of 'sea' processor in the northern line (can be /= jpni !)
141   INTEGER, PUBLIC ::   njmppmax          ! value of njmpp for the processors of the northern line
142   INTEGER, PUBLIC ::   north_root        ! number (in the comm_opa) of proc 0 in the northern comm
143   INTEGER, DIMENSION(:), ALLOCATABLE, SAVE, PUBLIC ::   nrank_north   ! dimension ndim_rank_north
144
145   ! Type of send : standard, buffered, immediate
146   CHARACTER(len=1), PUBLIC ::   cn_mpi_send   ! type od mpi send/recieve (S=standard, B=bsend, I=isend)
147   LOGICAL, PUBLIC          ::   l_isend = .FALSE.   ! isend use indicator (T if cn_mpi_send='I')
148   INTEGER, PUBLIC          ::   nn_buffer     ! size of the buffer in case of mpi_bsend
149
150   REAL(wp), DIMENSION(:), ALLOCATABLE, SAVE :: tampon  ! buffer in case of bsend
151
152   LOGICAL, PUBLIC                                  ::   ln_nnogather       ! namelist control of northfold comms
153   LOGICAL, PUBLIC                                  ::   l_north_nogather = .FALSE.  ! internal control of northfold comms
154   INTEGER, PUBLIC                                  ::   ityp
155   !!----------------------------------------------------------------------
156   !! NEMO/OPA 3.3 , NEMO Consortium (2010)
157   !! $Id$
158   !! Software governed by the CeCILL licence     (NEMOGCM/NEMO_CeCILL.txt)
159   !!----------------------------------------------------------------------
160CONTAINS
161
162
163   FUNCTION mynode( ldtxt, kumnam_ref , kumnam_cfg , kumond , kstop, localComm )
164      !!----------------------------------------------------------------------
165      !!                  ***  routine mynode  ***
166      !!
167      !! ** Purpose :   Find processor unit
168      !!----------------------------------------------------------------------
169      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
170      INTEGER                      , INTENT(in   ) ::   kumnam_ref     ! logical unit for reference namelist
171      INTEGER                      , INTENT(in   ) ::   kumnam_cfg     ! logical unit for configuration namelist
172      INTEGER                      , INTENT(inout) ::   kumond         ! logical unit for namelist output
173      INTEGER                      , INTENT(inout) ::   kstop          ! stop indicator
174      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
175      !
176      INTEGER ::   mynode, ierr, code, ji, ii, ios
177      LOGICAL ::   mpi_was_called
178      !
179      NAMELIST/nammpp/ cn_mpi_send, nn_buffer, jpni, jpnj, jpnij, ln_nnogather
180      !!----------------------------------------------------------------------
181      !
182      ii = 1
183      WRITE(ldtxt(ii),*)                                                                          ;   ii = ii + 1
184      WRITE(ldtxt(ii),*) 'mynode : mpi initialisation'                                            ;   ii = ii + 1
185      WRITE(ldtxt(ii),*) '~~~~~~ '                                                                ;   ii = ii + 1
186      !
187
188      REWIND( kumnam_ref )              ! Namelist nammpp in reference namelist: mpi variables
189      READ  ( kumnam_ref, nammpp, IOSTAT = ios, ERR = 901)
190901   IF( ios /= 0 ) CALL ctl_nam ( ios , 'nammpp in reference namelist', lwp )
191
192      REWIND( kumnam_cfg )              ! Namelist nammpp in configuration namelist: mpi variables
193      READ  ( kumnam_cfg, nammpp, IOSTAT = ios, ERR = 902 )
194902   IF( ios /= 0 ) CALL ctl_nam ( ios , 'nammpp in configuration namelist', lwp )
195
196      !                              ! control print
197      WRITE(ldtxt(ii),*) '   Namelist nammpp'                                                     ;   ii = ii + 1
198      WRITE(ldtxt(ii),*) '      mpi send type                      cn_mpi_send = ', cn_mpi_send   ;   ii = ii + 1
199      WRITE(ldtxt(ii),*) '      size in bytes of exported buffer   nn_buffer   = ', nn_buffer     ;   ii = ii + 1
200
201#if defined key_agrif
202      IF( .NOT. Agrif_Root() ) THEN
203         jpni  = Agrif_Parent(jpni )
204         jpnj  = Agrif_Parent(jpnj )
205         jpnij = Agrif_Parent(jpnij)
206      ENDIF
207#endif
208
209      IF(jpnij < 1)THEN
210         ! If jpnij is not specified in namelist then we calculate it - this
211         ! means there will be no land cutting out.
212         jpnij = jpni * jpnj
213      END IF
214
215      IF( (jpni < 1) .OR. (jpnj < 1) )THEN
216         WRITE(ldtxt(ii),*) '      jpni, jpnj and jpnij will be calculated automatically'; ii = ii + 1
217      ELSE
218         WRITE(ldtxt(ii),*) '      processor grid extent in i         jpni = ',jpni; ii = ii + 1
219         WRITE(ldtxt(ii),*) '      processor grid extent in j         jpnj = ',jpnj; ii = ii + 1
220         WRITE(ldtxt(ii),*) '      number of local domains           jpnij = ',jpnij; ii = ii +1
221      END IF
222
223      WRITE(ldtxt(ii),*) '      avoid use of mpi_allgather at the north fold  ln_nnogather = ', ln_nnogather  ; ii = ii + 1
224
225      CALL mpi_initialized ( mpi_was_called, code )
226      IF( code /= MPI_SUCCESS ) THEN
227         DO ji = 1, SIZE(ldtxt)
228            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
229         END DO
230         WRITE(*, cform_err)
231         WRITE(*, *) 'lib_mpp: Error in routine mpi_initialized'
232         CALL mpi_abort( mpi_comm_world, code, ierr )
233      ENDIF
234
235      IF( mpi_was_called ) THEN
236         !
237         SELECT CASE ( cn_mpi_send )
238         CASE ( 'S' )                ! Standard mpi send (blocking)
239            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'                     ;   ii = ii + 1
240         CASE ( 'B' )                ! Buffer mpi send (blocking)
241            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'                      ;   ii = ii + 1
242            IF( Agrif_Root() )   CALL mpi_init_opa( ldtxt, ii, ierr )
243         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
244            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'                   ;   ii = ii + 1
245            l_isend = .TRUE.
246         CASE DEFAULT
247            WRITE(ldtxt(ii),cform_err)                                                            ;   ii = ii + 1
248            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send             ;   ii = ii + 1
249            kstop = kstop + 1
250         END SELECT
251      ELSE IF ( PRESENT(localComm) .and. .not. mpi_was_called ) THEN
252         WRITE(ldtxt(ii),*) ' lib_mpp: You cannot provide a local communicator '                  ;   ii = ii + 1
253         WRITE(ldtxt(ii),*) '          without calling MPI_Init before ! '                        ;   ii = ii + 1
254         kstop = kstop + 1
255      ELSE
256         SELECT CASE ( cn_mpi_send )
257         CASE ( 'S' )                ! Standard mpi send (blocking)
258            WRITE(ldtxt(ii),*) '           Standard blocking mpi send (send)'                     ;   ii = ii + 1
259            CALL mpi_init( ierr )
260         CASE ( 'B' )                ! Buffer mpi send (blocking)
261            WRITE(ldtxt(ii),*) '           Buffer blocking mpi send (bsend)'                      ;   ii = ii + 1
262            IF( Agrif_Root() )   CALL mpi_init_opa( ldtxt, ii, ierr )
263         CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
264            WRITE(ldtxt(ii),*) '           Immediate non-blocking send (isend)'                   ;   ii = ii + 1
265            l_isend = .TRUE.
266            CALL mpi_init( ierr )
267         CASE DEFAULT
268            WRITE(ldtxt(ii),cform_err)                                                            ;   ii = ii + 1
269            WRITE(ldtxt(ii),*) '           bad value for cn_mpi_send = ', cn_mpi_send             ;   ii = ii + 1
270            kstop = kstop + 1
271         END SELECT
272         !
273      ENDIF
274
275      IF( PRESENT(localComm) ) THEN
276         IF( Agrif_Root() ) THEN
277            mpi_comm_opa = localComm
278         ENDIF
279      ELSE
280         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_opa, code)
281         IF( code /= MPI_SUCCESS ) THEN
282            DO ji = 1, SIZE(ldtxt)
283               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
284            END DO
285            WRITE(*, cform_err)
286            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
287            CALL mpi_abort( mpi_comm_world, code, ierr )
288         ENDIF
289      ENDIF
290
291      CALL mpi_comm_rank( mpi_comm_opa, mpprank, ierr )
292      CALL mpi_comm_size( mpi_comm_opa, mppsize, ierr )
293      mynode = mpprank
294
295      IF( mynode == 0 ) THEN
296        CALL ctl_opn( kumond, 'output.namelist.dyn', 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
297        WRITE(kumond, nammpp)     
298      ENDIF
299      !
300      CALL MPI_OP_CREATE(DDPDD_MPI, .TRUE., MPI_SUMDD, ierr)
301      !
302   END FUNCTION mynode
303
304   SUBROUTINE mpp_lnk_3d( ptab, cd_type, psgn, cd_mpp, pval )
305      !!----------------------------------------------------------------------
306      !!                  ***  routine mpp_lnk_3d  ***
307      !!
308      !! ** Purpose :   Message passing manadgement
309      !!
310      !! ** Method  :   Use mppsend and mpprecv function for passing mask
311      !!      between processors following neighboring subdomains.
312      !!            domain parameters
313      !!                    nlci   : first dimension of the local subdomain
314      !!                    nlcj   : second dimension of the local subdomain
315      !!                    nbondi : mark for "east-west local boundary"
316      !!                    nbondj : mark for "north-south local boundary"
317      !!                    noea   : number for local neighboring processors
318      !!                    nowe   : number for local neighboring processors
319      !!                    noso   : number for local neighboring processors
320      !!                    nono   : number for local neighboring processors
321      !!
322      !! ** Action  :   ptab with update value at its periphery
323      !!
324      !!----------------------------------------------------------------------
325      REAL(wp), DIMENSION(jpi,jpj,jpk), INTENT(inout) ::   ptab     ! 3D array on which the boundary condition is applied
326      CHARACTER(len=1)                , INTENT(in   ) ::   cd_type  ! define the nature of ptab array grid-points
327      !                                                             ! = T , U , V , F , W points
328      REAL(wp)                        , INTENT(in   ) ::   psgn     ! =-1 the sign change across the north fold boundary
329      !                                                             ! =  1. , the sign is kept
330      CHARACTER(len=3), OPTIONAL      , INTENT(in   ) ::   cd_mpp   ! fill the overlap area only
331      REAL(wp)        , OPTIONAL      , INTENT(in   ) ::   pval     ! background value (used at closed boundaries)
332      !!
333      INTEGER  ::   ji, jj, jk, jl             ! dummy loop indices
334      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
335      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
336      REAL(wp) ::   zland
337      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
338      !
339      REAL(wp), DIMENSION(:,:,:,:), ALLOCATABLE ::   zt3ns, zt3sn   ! 3d for north-south & south-north
340      REAL(wp), DIMENSION(:,:,:,:), ALLOCATABLE ::   zt3ew, zt3we   ! 3d for east-west & west-east
341
342      !!----------------------------------------------------------------------
343     
344      ALLOCATE( zt3ns(jpi,jprecj,jpk,2), zt3sn(jpi,jprecj,jpk,2),   &
345         &      zt3ew(jpj,jpreci,jpk,2), zt3we(jpj,jpreci,jpk,2)  )
346
347      !
348      IF( PRESENT( pval ) ) THEN   ;   zland = pval      ! set land value
349      ELSE                         ;   zland = 0.e0      ! zero by default
350      ENDIF
351
352      ! 1. standard boundary treatment
353      ! ------------------------------
354      IF( PRESENT( cd_mpp ) ) THEN      ! only fill added line/raw with existing values
355         !
356         ! WARNING ptab is defined only between nld and nle
357         DO jk = 1, jpk
358            DO jj = nlcj+1, jpj                 ! added line(s)   (inner only)
359               ptab(nldi  :nlei  , jj          ,jk) = ptab(nldi:nlei,     nlej,jk)
360               ptab(1     :nldi-1, jj          ,jk) = ptab(nldi     ,     nlej,jk)
361               ptab(nlei+1:nlci  , jj          ,jk) = ptab(     nlei,     nlej,jk)
362            END DO
363            DO ji = nlci+1, jpi                 ! added column(s) (full)
364               ptab(ji           ,nldj  :nlej  ,jk) = ptab(     nlei,nldj:nlej,jk)
365               ptab(ji           ,1     :nldj-1,jk) = ptab(     nlei,nldj     ,jk)
366               ptab(ji           ,nlej+1:jpj   ,jk) = ptab(     nlei,     nlej,jk)
367            END DO
368         END DO
369         !
370      ELSE                              ! standard close or cyclic treatment
371         !
372         !                                   ! East-West boundaries
373         !                                        !* Cyclic east-west
374         IF( nbondi == 2 .AND. (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
375            ptab( 1 ,:,:) = ptab(jpim1,:,:)
376            ptab(jpi,:,:) = ptab(  2  ,:,:)
377         ELSE                                     !* closed
378            IF( .NOT. cd_type == 'F' )   ptab(     1       :jpreci,:,:) = zland    ! south except F-point
379                                         ptab(nlci-jpreci+1:jpi   ,:,:) = zland    ! north
380         ENDIF
381         !                                   ! North-South boundaries (always closed)
382         IF( .NOT. cd_type == 'F' )   ptab(:,     1       :jprecj,:) = zland       ! south except F-point
383                                      ptab(:,nlcj-jprecj+1:jpj   ,:) = zland       ! north
384         !
385      ENDIF
386
387      ! 2. East and west directions exchange
388      ! ------------------------------------
389      ! we play with the neigbours AND the row number because of the periodicity
390      !
391      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
392      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
393         iihom = nlci-nreci
394         DO jl = 1, jpreci
395            zt3ew(:,jl,:,1) = ptab(jpreci+jl,:,:)
396            zt3we(:,jl,:,1) = ptab(iihom +jl,:,:)
397         END DO
398      END SELECT
399      !
400      !                           ! Migrations
401      imigr = jpreci * jpj * jpk
402      !
403      SELECT CASE ( nbondi )
404      CASE ( -1 )
405         CALL mppsend( 2, zt3we(1,1,1,1), imigr, noea, ml_req1 )
406         CALL mpprecv( 1, zt3ew(1,1,1,2), imigr, noea )
407         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
408      CASE ( 0 )
409         CALL mppsend( 1, zt3ew(1,1,1,1), imigr, nowe, ml_req1 )
410         CALL mppsend( 2, zt3we(1,1,1,1), imigr, noea, ml_req2 )
411         CALL mpprecv( 1, zt3ew(1,1,1,2), imigr, noea )
412         CALL mpprecv( 2, zt3we(1,1,1,2), imigr, nowe )
413         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
414         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
415      CASE ( 1 )
416         CALL mppsend( 1, zt3ew(1,1,1,1), imigr, nowe, ml_req1 )
417         CALL mpprecv( 2, zt3we(1,1,1,2), imigr, nowe )
418         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
419      END SELECT
420      !
421      !                           ! Write Dirichlet lateral conditions
422      iihom = nlci-jpreci
423      !
424      SELECT CASE ( nbondi )
425      CASE ( -1 )
426         DO jl = 1, jpreci
427            ptab(iihom+jl,:,:) = zt3ew(:,jl,:,2)
428         END DO
429      CASE ( 0 )
430         DO jl = 1, jpreci
431            ptab(jl      ,:,:) = zt3we(:,jl,:,2)
432            ptab(iihom+jl,:,:) = zt3ew(:,jl,:,2)
433         END DO
434      CASE ( 1 )
435         DO jl = 1, jpreci
436            ptab(jl      ,:,:) = zt3we(:,jl,:,2)
437         END DO
438      END SELECT
439
440
441      ! 3. North and south directions
442      ! -----------------------------
443      ! always closed : we play only with the neigbours
444      !
445      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
446         ijhom = nlcj-nrecj
447         DO jl = 1, jprecj
448            zt3sn(:,jl,:,1) = ptab(:,ijhom +jl,:)
449            zt3ns(:,jl,:,1) = ptab(:,jprecj+jl,:)
450         END DO
451      ENDIF
452      !
453      !                           ! Migrations
454      imigr = jprecj * jpi * jpk
455      !
456      SELECT CASE ( nbondj )
457      CASE ( -1 )
458         CALL mppsend( 4, zt3sn(1,1,1,1), imigr, nono, ml_req1 )
459         CALL mpprecv( 3, zt3ns(1,1,1,2), imigr, nono )
460         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
461      CASE ( 0 )
462         CALL mppsend( 3, zt3ns(1,1,1,1), imigr, noso, ml_req1 )
463         CALL mppsend( 4, zt3sn(1,1,1,1), imigr, nono, ml_req2 )
464         CALL mpprecv( 3, zt3ns(1,1,1,2), imigr, nono )
465         CALL mpprecv( 4, zt3sn(1,1,1,2), imigr, noso )
466         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
467         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
468      CASE ( 1 )
469         CALL mppsend( 3, zt3ns(1,1,1,1), imigr, noso, ml_req1 )
470         CALL mpprecv( 4, zt3sn(1,1,1,2), imigr, noso )
471         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
472      END SELECT
473      !
474      !                           ! Write Dirichlet lateral conditions
475      ijhom = nlcj-jprecj
476      !
477      SELECT CASE ( nbondj )
478      CASE ( -1 )
479         DO jl = 1, jprecj
480            ptab(:,ijhom+jl,:) = zt3ns(:,jl,:,2)
481         END DO
482      CASE ( 0 )
483         DO jl = 1, jprecj
484            ptab(:,jl      ,:) = zt3sn(:,jl,:,2)
485            ptab(:,ijhom+jl,:) = zt3ns(:,jl,:,2)
486         END DO
487      CASE ( 1 )
488         DO jl = 1, jprecj
489            ptab(:,jl,:) = zt3sn(:,jl,:,2)
490         END DO
491      END SELECT
492
493
494      ! 4. north fold treatment
495      ! -----------------------
496      !
497      IF( npolj /= 0 .AND. .NOT. PRESENT(cd_mpp) ) THEN
498         !
499         SELECT CASE ( jpni )
500         CASE ( 1 )     ;   CALL lbc_nfd      ( ptab, cd_type, psgn )   ! only 1 northern proc, no mpp
501         CASE DEFAULT   ;   CALL mpp_lbc_north( ptab, cd_type, psgn )   ! for all northern procs.
502         END SELECT
503         !
504      ENDIF
505      !
506      DEALLOCATE( zt3ns, zt3sn, zt3ew, zt3we )
507      !
508   END SUBROUTINE mpp_lnk_3d
509
510
511   SUBROUTINE mpp_lnk_2d( pt2d, cd_type, psgn, cd_mpp, pval )
512      !!----------------------------------------------------------------------
513      !!                  ***  routine mpp_lnk_2d  ***
514      !!
515      !! ** Purpose :   Message passing manadgement for 2d array
516      !!
517      !! ** Method  :   Use mppsend and mpprecv function for passing mask
518      !!      between processors following neighboring subdomains.
519      !!            domain parameters
520      !!                    nlci   : first dimension of the local subdomain
521      !!                    nlcj   : second dimension of the local subdomain
522      !!                    nbondi : mark for "east-west local boundary"
523      !!                    nbondj : mark for "north-south local boundary"
524      !!                    noea   : number for local neighboring processors
525      !!                    nowe   : number for local neighboring processors
526      !!                    noso   : number for local neighboring processors
527      !!                    nono   : number for local neighboring processors
528      !!
529      !!----------------------------------------------------------------------
530      REAL(wp), DIMENSION(jpi,jpj), INTENT(inout) ::   pt2d     ! 2D array on which the boundary condition is applied
531      CHARACTER(len=1)            , INTENT(in   ) ::   cd_type  ! define the nature of ptab array grid-points
532      !                                                         ! = T , U , V , F , W and I points
533      REAL(wp)                    , INTENT(in   ) ::   psgn     ! =-1 the sign change across the north fold boundary
534      !                                                         ! =  1. , the sign is kept
535      CHARACTER(len=3), OPTIONAL  , INTENT(in   ) ::   cd_mpp   ! fill the overlap area only
536      REAL(wp)        , OPTIONAL  , INTENT(in   ) ::   pval     ! background value (used at closed boundaries)
537      !!
538      INTEGER  ::   ji, jj, jl   ! dummy loop indices
539      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
540      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
541      REAL(wp) ::   zland
542      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
543      !
544      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE ::  zt2ns, zt2sn   ! 2d for north-south & south-north
545      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE ::  zt2ew, zt2we   ! 2d for east-west & west-east
546
547      !!----------------------------------------------------------------------
548
549      ALLOCATE( zt2ns(jpi,jprecj,2), zt2sn(jpi,jprecj,2),  &
550         &      zt2ew(jpj,jpreci,2), zt2we(jpj,jpreci,2)   )
551
552      !
553      IF( PRESENT( pval ) ) THEN   ;   zland = pval      ! set land value
554      ELSE                         ;   zland = 0.e0      ! zero by default
555      ENDIF
556
557      ! 1. standard boundary treatment
558      ! ------------------------------
559      !
560      IF( PRESENT( cd_mpp ) ) THEN      ! only fill added line/raw with existing values
561         !
562         ! WARNING pt2d is defined only between nld and nle
563         DO jj = nlcj+1, jpj                 ! added line(s)   (inner only)
564            pt2d(nldi  :nlei  , jj          ) = pt2d(nldi:nlei,     nlej)
565            pt2d(1     :nldi-1, jj          ) = pt2d(nldi     ,     nlej)
566            pt2d(nlei+1:nlci  , jj          ) = pt2d(     nlei,     nlej)
567         END DO
568         DO ji = nlci+1, jpi                 ! added column(s) (full)
569            pt2d(ji           ,nldj  :nlej  ) = pt2d(     nlei,nldj:nlej)
570            pt2d(ji           ,1     :nldj-1) = pt2d(     nlei,nldj     )
571            pt2d(ji           ,nlej+1:jpj   ) = pt2d(     nlei,     nlej)
572         END DO
573         !
574      ELSE                              ! standard close or cyclic treatment
575         !
576         !                                   ! East-West boundaries
577         IF( nbondi == 2 .AND.   &                ! Cyclic east-west
578            &    (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
579            pt2d( 1 ,:) = pt2d(jpim1,:)                                    ! west
580            pt2d(jpi,:) = pt2d(  2  ,:)                                    ! east
581         ELSE                                     ! closed
582            IF( .NOT. cd_type == 'F' )   pt2d(     1       :jpreci,:) = zland    ! south except F-point
583                                         pt2d(nlci-jpreci+1:jpi   ,:) = zland    ! north
584         ENDIF
585         !                                   ! North-South boundaries (always closed)
586            IF( .NOT. cd_type == 'F' )   pt2d(:,     1       :jprecj) = zland    !south except F-point
587                                         pt2d(:,nlcj-jprecj+1:jpj   ) = zland    ! north
588         !
589      ENDIF
590
591      ! 2. East and west directions exchange
592      ! ------------------------------------
593      ! we play with the neigbours AND the row number because of the periodicity
594      !
595      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
596      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
597         iihom = nlci-nreci
598         DO jl = 1, jpreci
599            zt2ew(:,jl,1) = pt2d(jpreci+jl,:)
600            zt2we(:,jl,1) = pt2d(iihom +jl,:)
601         END DO
602      END SELECT
603      !
604      !                           ! Migrations
605      imigr = jpreci * jpj
606      !
607      SELECT CASE ( nbondi )
608      CASE ( -1 )
609         CALL mppsend( 2, zt2we(1,1,1), imigr, noea, ml_req1 )
610         CALL mpprecv( 1, zt2ew(1,1,2), imigr, noea )
611         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
612      CASE ( 0 )
613         CALL mppsend( 1, zt2ew(1,1,1), imigr, nowe, ml_req1 )
614         CALL mppsend( 2, zt2we(1,1,1), imigr, noea, ml_req2 )
615         CALL mpprecv( 1, zt2ew(1,1,2), imigr, noea )
616         CALL mpprecv( 2, zt2we(1,1,2), imigr, nowe )
617         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
618         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
619      CASE ( 1 )
620         CALL mppsend( 1, zt2ew(1,1,1), imigr, nowe, ml_req1 )
621         CALL mpprecv( 2, zt2we(1,1,2), imigr, nowe )
622         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
623      END SELECT
624      !
625      !                           ! Write Dirichlet lateral conditions
626      iihom = nlci - jpreci
627      !
628      SELECT CASE ( nbondi )
629      CASE ( -1 )
630         DO jl = 1, jpreci
631            pt2d(iihom+jl,:) = zt2ew(:,jl,2)
632         END DO
633      CASE ( 0 )
634         DO jl = 1, jpreci
635            pt2d(jl      ,:) = zt2we(:,jl,2)
636            pt2d(iihom+jl,:) = zt2ew(:,jl,2)
637         END DO
638      CASE ( 1 )
639         DO jl = 1, jpreci
640            pt2d(jl      ,:) = zt2we(:,jl,2)
641         END DO
642      END SELECT
643
644
645      ! 3. North and south directions
646      ! -----------------------------
647      ! always closed : we play only with the neigbours
648      !
649      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
650         ijhom = nlcj-nrecj
651         DO jl = 1, jprecj
652            zt2sn(:,jl,1) = pt2d(:,ijhom +jl)
653            zt2ns(:,jl,1) = pt2d(:,jprecj+jl)
654         END DO
655      ENDIF
656      !
657      !                           ! Migrations
658      imigr = jprecj * jpi
659      !
660      SELECT CASE ( nbondj )
661      CASE ( -1 )
662         CALL mppsend( 4, zt2sn(1,1,1), imigr, nono, ml_req1 )
663         CALL mpprecv( 3, zt2ns(1,1,2), imigr, nono )
664         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
665      CASE ( 0 )
666         CALL mppsend( 3, zt2ns(1,1,1), imigr, noso, ml_req1 )
667         CALL mppsend( 4, zt2sn(1,1,1), imigr, nono, ml_req2 )
668         CALL mpprecv( 3, zt2ns(1,1,2), imigr, nono )
669         CALL mpprecv( 4, zt2sn(1,1,2), imigr, noso )
670         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
671         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
672      CASE ( 1 )
673         CALL mppsend( 3, zt2ns(1,1,1), imigr, noso, ml_req1 )
674         CALL mpprecv( 4, zt2sn(1,1,2), imigr, noso )
675         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
676      END SELECT
677      !
678      !                           ! Write Dirichlet lateral conditions
679      ijhom = nlcj - jprecj
680      !
681      SELECT CASE ( nbondj )
682      CASE ( -1 )
683         DO jl = 1, jprecj
684            pt2d(:,ijhom+jl) = zt2ns(:,jl,2)
685         END DO
686      CASE ( 0 )
687         DO jl = 1, jprecj
688            pt2d(:,jl      ) = zt2sn(:,jl,2)
689            pt2d(:,ijhom+jl) = zt2ns(:,jl,2)
690         END DO
691      CASE ( 1 )
692         DO jl = 1, jprecj
693            pt2d(:,jl      ) = zt2sn(:,jl,2)
694         END DO
695      END SELECT
696
697
698      ! 4. north fold treatment
699      ! -----------------------
700      !
701      IF( npolj /= 0 .AND. .NOT. PRESENT(cd_mpp) ) THEN
702         !
703         SELECT CASE ( jpni )
704         CASE ( 1 )     ;   CALL lbc_nfd      ( pt2d, cd_type, psgn )   ! only 1 northern proc, no mpp
705         CASE DEFAULT   ;   CALL mpp_lbc_north( pt2d, cd_type, psgn )   ! for all northern procs.
706         END SELECT
707         !
708      ENDIF
709      !
710      DEALLOCATE( zt2ns, zt2sn, zt2ew, zt2we )
711      !
712   END SUBROUTINE mpp_lnk_2d
713
714
715   SUBROUTINE mpp_lnk_3d_gather( ptab1, cd_type1, ptab2, cd_type2, psgn )
716      !!----------------------------------------------------------------------
717      !!                  ***  routine mpp_lnk_3d_gather  ***
718      !!
719      !! ** Purpose :   Message passing manadgement for two 3D arrays
720      !!
721      !! ** Method  :   Use mppsend and mpprecv function for passing mask
722      !!      between processors following neighboring subdomains.
723      !!            domain parameters
724      !!                    nlci   : first dimension of the local subdomain
725      !!                    nlcj   : second dimension of the local subdomain
726      !!                    nbondi : mark for "east-west local boundary"
727      !!                    nbondj : mark for "north-south local boundary"
728      !!                    noea   : number for local neighboring processors
729      !!                    nowe   : number for local neighboring processors
730      !!                    noso   : number for local neighboring processors
731      !!                    nono   : number for local neighboring processors
732      !!
733      !! ** Action  :   ptab1 and ptab2  with update value at its periphery
734      !!
735      !!----------------------------------------------------------------------
736      REAL(wp), DIMENSION(jpi,jpj,jpk), INTENT(inout) ::   ptab1     ! first and second 3D array on which
737      REAL(wp), DIMENSION(jpi,jpj,jpk), INTENT(inout) ::   ptab2     ! the boundary condition is applied
738      CHARACTER(len=1)                , INTENT(in   ) ::   cd_type1  ! nature of ptab1 and ptab2 arrays
739      CHARACTER(len=1)                , INTENT(in   ) ::   cd_type2  ! i.e. grid-points = T , U , V , F or W points
740      REAL(wp)                        , INTENT(in   ) ::   psgn      ! =-1 the sign change across the north fold boundary
741      !!                                                             ! =  1. , the sign is kept
742      INTEGER  ::   jl   ! dummy loop indices
743      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
744      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
745      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
746      !
747      REAL(wp), DIMENSION(:,:,:,:,:), ALLOCATABLE ::   zt4ns, zt4sn   ! 2 x 3d for north-south & south-north
748      REAL(wp), DIMENSION(:,:,:,:,:), ALLOCATABLE ::   zt4ew, zt4we   ! 2 x 3d for east-west & west-east
749
750      !!----------------------------------------------------------------------
751      ALLOCATE( zt4ns(jpi,jprecj,jpk,2,2), zt4sn(jpi,jprecj,jpk,2,2) ,    &
752         &      zt4ew(jpj,jpreci,jpk,2,2), zt4we(jpj,jpreci,jpk,2,2) )
753
754
755      ! 1. standard boundary treatment
756      ! ------------------------------
757      !                                      ! East-West boundaries
758      !                                           !* Cyclic east-west
759      IF( nbondi == 2 .AND. (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
760         ptab1( 1 ,:,:) = ptab1(jpim1,:,:)
761         ptab1(jpi,:,:) = ptab1(  2  ,:,:)
762         ptab2( 1 ,:,:) = ptab2(jpim1,:,:)
763         ptab2(jpi,:,:) = ptab2(  2  ,:,:)
764      ELSE                                        !* closed
765         IF( .NOT. cd_type1 == 'F' )   ptab1(     1       :jpreci,:,:) = 0.e0    ! south except at F-point
766         IF( .NOT. cd_type2 == 'F' )   ptab2(     1       :jpreci,:,:) = 0.e0
767                                       ptab1(nlci-jpreci+1:jpi   ,:,:) = 0.e0    ! north
768                                       ptab2(nlci-jpreci+1:jpi   ,:,:) = 0.e0
769      ENDIF
770
771
772      !                                      ! North-South boundaries
773      IF( .NOT. cd_type1 == 'F' )   ptab1(:,     1       :jprecj,:) = 0.e0    ! south except at F-point
774      IF( .NOT. cd_type2 == 'F' )   ptab2(:,     1       :jprecj,:) = 0.e0
775                                    ptab1(:,nlcj-jprecj+1:jpj   ,:) = 0.e0    ! north
776                                    ptab2(:,nlcj-jprecj+1:jpj   ,:) = 0.e0
777
778
779      ! 2. East and west directions exchange
780      ! ------------------------------------
781      ! we play with the neigbours AND the row number because of the periodicity
782      !
783      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
784      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
785         iihom = nlci-nreci
786         DO jl = 1, jpreci
787            zt4ew(:,jl,:,1,1) = ptab1(jpreci+jl,:,:)
788            zt4we(:,jl,:,1,1) = ptab1(iihom +jl,:,:)
789            zt4ew(:,jl,:,2,1) = ptab2(jpreci+jl,:,:)
790            zt4we(:,jl,:,2,1) = ptab2(iihom +jl,:,:)
791         END DO
792      END SELECT
793      !
794      !                           ! Migrations
795      imigr = jpreci * jpj * jpk *2
796      !
797      SELECT CASE ( nbondi )
798      CASE ( -1 )
799         CALL mppsend( 2, zt4we(1,1,1,1,1), imigr, noea, ml_req1 )
800         CALL mpprecv( 1, zt4ew(1,1,1,1,2), imigr, noea )
801         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
802      CASE ( 0 )
803         CALL mppsend( 1, zt4ew(1,1,1,1,1), imigr, nowe, ml_req1 )
804         CALL mppsend( 2, zt4we(1,1,1,1,1), imigr, noea, ml_req2 )
805         CALL mpprecv( 1, zt4ew(1,1,1,1,2), imigr, noea )
806         CALL mpprecv( 2, zt4we(1,1,1,1,2), imigr, nowe )
807         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
808         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
809      CASE ( 1 )
810         CALL mppsend( 1, zt4ew(1,1,1,1,1), imigr, nowe, ml_req1 )
811         CALL mpprecv( 2, zt4we(1,1,1,1,2), imigr, nowe )
812         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
813      END SELECT
814      !
815      !                           ! Write Dirichlet lateral conditions
816      iihom = nlci - jpreci
817      !
818      SELECT CASE ( nbondi )
819      CASE ( -1 )
820         DO jl = 1, jpreci
821            ptab1(iihom+jl,:,:) = zt4ew(:,jl,:,1,2)
822            ptab2(iihom+jl,:,:) = zt4ew(:,jl,:,2,2)
823         END DO
824      CASE ( 0 )
825         DO jl = 1, jpreci
826            ptab1(jl      ,:,:) = zt4we(:,jl,:,1,2)
827            ptab1(iihom+jl,:,:) = zt4ew(:,jl,:,1,2)
828            ptab2(jl      ,:,:) = zt4we(:,jl,:,2,2)
829            ptab2(iihom+jl,:,:) = zt4ew(:,jl,:,2,2)
830         END DO
831      CASE ( 1 )
832         DO jl = 1, jpreci
833            ptab1(jl      ,:,:) = zt4we(:,jl,:,1,2)
834            ptab2(jl      ,:,:) = zt4we(:,jl,:,2,2)
835         END DO
836      END SELECT
837
838
839      ! 3. North and south directions
840      ! -----------------------------
841      ! always closed : we play only with the neigbours
842      !
843      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
844         ijhom = nlcj - nrecj
845         DO jl = 1, jprecj
846            zt4sn(:,jl,:,1,1) = ptab1(:,ijhom +jl,:)
847            zt4ns(:,jl,:,1,1) = ptab1(:,jprecj+jl,:)
848            zt4sn(:,jl,:,2,1) = ptab2(:,ijhom +jl,:)
849            zt4ns(:,jl,:,2,1) = ptab2(:,jprecj+jl,:)
850         END DO
851      ENDIF
852      !
853      !                           ! Migrations
854      imigr = jprecj * jpi * jpk * 2
855      !
856      SELECT CASE ( nbondj )
857      CASE ( -1 )
858         CALL mppsend( 4, zt4sn(1,1,1,1,1), imigr, nono, ml_req1 )
859         CALL mpprecv( 3, zt4ns(1,1,1,1,2), imigr, nono )
860         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
861      CASE ( 0 )
862         CALL mppsend( 3, zt4ns(1,1,1,1,1), imigr, noso, ml_req1 )
863         CALL mppsend( 4, zt4sn(1,1,1,1,1), imigr, nono, ml_req2 )
864         CALL mpprecv( 3, zt4ns(1,1,1,1,2), imigr, nono )
865         CALL mpprecv( 4, zt4sn(1,1,1,1,2), imigr, noso )
866         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
867         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
868      CASE ( 1 )
869         CALL mppsend( 3, zt4ns(1,1,1,1,1), imigr, noso, ml_req1 )
870         CALL mpprecv( 4, zt4sn(1,1,1,1,2), imigr, noso )
871         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
872      END SELECT
873      !
874      !                           ! Write Dirichlet lateral conditions
875      ijhom = nlcj - jprecj
876      !
877      SELECT CASE ( nbondj )
878      CASE ( -1 )
879         DO jl = 1, jprecj
880            ptab1(:,ijhom+jl,:) = zt4ns(:,jl,:,1,2)
881            ptab2(:,ijhom+jl,:) = zt4ns(:,jl,:,2,2)
882         END DO
883      CASE ( 0 )
884         DO jl = 1, jprecj
885            ptab1(:,jl      ,:) = zt4sn(:,jl,:,1,2)
886            ptab1(:,ijhom+jl,:) = zt4ns(:,jl,:,1,2)
887            ptab2(:,jl      ,:) = zt4sn(:,jl,:,2,2)
888            ptab2(:,ijhom+jl,:) = zt4ns(:,jl,:,2,2)
889         END DO
890      CASE ( 1 )
891         DO jl = 1, jprecj
892            ptab1(:,jl,:) = zt4sn(:,jl,:,1,2)
893            ptab2(:,jl,:) = zt4sn(:,jl,:,2,2)
894         END DO
895      END SELECT
896
897
898      ! 4. north fold treatment
899      ! -----------------------
900      IF( npolj /= 0 ) THEN
901         !
902         SELECT CASE ( jpni )
903         CASE ( 1 )
904            CALL lbc_nfd      ( ptab1, cd_type1, psgn )   ! only for northern procs.
905            CALL lbc_nfd      ( ptab2, cd_type2, psgn )
906         CASE DEFAULT
907            CALL mpp_lbc_north( ptab1, cd_type1, psgn )   ! for all northern procs.
908            CALL mpp_lbc_north (ptab2, cd_type2, psgn)
909         END SELECT
910         !
911      ENDIF
912      !
913      DEALLOCATE( zt4ns, zt4sn, zt4ew, zt4we )
914      !
915   END SUBROUTINE mpp_lnk_3d_gather
916
917
918   SUBROUTINE mpp_lnk_2d_e( pt2d, cd_type, psgn, jpri, jprj )
919      !!----------------------------------------------------------------------
920      !!                  ***  routine mpp_lnk_2d_e  ***
921      !!
922      !! ** Purpose :   Message passing manadgement for 2d array (with halo)
923      !!
924      !! ** Method  :   Use mppsend and mpprecv function for passing mask
925      !!      between processors following neighboring subdomains.
926      !!            domain parameters
927      !!                    nlci   : first dimension of the local subdomain
928      !!                    nlcj   : second dimension of the local subdomain
929      !!                    jpri   : number of rows for extra outer halo
930      !!                    jprj   : number of columns for extra outer halo
931      !!                    nbondi : mark for "east-west local boundary"
932      !!                    nbondj : mark for "north-south local boundary"
933      !!                    noea   : number for local neighboring processors
934      !!                    nowe   : number for local neighboring processors
935      !!                    noso   : number for local neighboring processors
936      !!                    nono   : number for local neighboring processors
937      !!
938      !!----------------------------------------------------------------------
939      INTEGER                                             , INTENT(in   ) ::   jpri
940      INTEGER                                             , INTENT(in   ) ::   jprj
941      REAL(wp), DIMENSION(1-jpri:jpi+jpri,1-jprj:jpj+jprj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
942      CHARACTER(len=1)                                    , INTENT(in   ) ::   cd_type  ! nature of ptab array grid-points
943      !                                                                                 ! = T , U , V , F , W and I points
944      REAL(wp)                                            , INTENT(in   ) ::   psgn     ! =-1 the sign change across the
945      !!                                                                                ! north boundary, =  1. otherwise
946      INTEGER  ::   jl   ! dummy loop indices
947      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
948      INTEGER  ::   ipreci, iprecj             ! temporary integers
949      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
950      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
951      !!
952      REAL(wp), DIMENSION(1-jpri:jpi+jpri,jprecj+jprj,2) :: r2dns
953      REAL(wp), DIMENSION(1-jpri:jpi+jpri,jprecj+jprj,2) :: r2dsn
954      REAL(wp), DIMENSION(1-jprj:jpj+jprj,jpreci+jpri,2) :: r2dwe
955      REAL(wp), DIMENSION(1-jprj:jpj+jprj,jpreci+jpri,2) :: r2dew
956      !!----------------------------------------------------------------------
957
958      ipreci = jpreci + jpri      ! take into account outer extra 2D overlap area
959      iprecj = jprecj + jprj
960
961
962      ! 1. standard boundary treatment
963      ! ------------------------------
964      ! Order matters Here !!!!
965      !
966      !                                      !* North-South boundaries (always colsed)
967      IF( .NOT. cd_type == 'F' )   pt2d(:,  1-jprj   :  jprecj  ) = 0.e0    ! south except at F-point
968                                   pt2d(:,nlcj-jprecj+1:jpj+jprj) = 0.e0    ! north
969
970      !                                      ! East-West boundaries
971      !                                           !* Cyclic east-west
972      IF( nbondi == 2 .AND. (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) ) THEN
973         pt2d(1-jpri:     1    ,:) = pt2d(jpim1-jpri:  jpim1 ,:)       ! east
974         pt2d(   jpi  :jpi+jpri,:) = pt2d(     2      :2+jpri,:)       ! west
975         !
976      ELSE                                        !* closed
977         IF( .NOT. cd_type == 'F' )   pt2d(  1-jpri   :jpreci    ,:) = 0.e0    ! south except at F-point
978                                      pt2d(nlci-jpreci+1:jpi+jpri,:) = 0.e0    ! north
979      ENDIF
980      !
981
982      ! north fold treatment
983      ! -----------------------
984      IF( npolj /= 0 ) THEN
985         !
986         SELECT CASE ( jpni )
987         CASE ( 1 )     ;   CALL lbc_nfd        ( pt2d(1:jpi,1:jpj+jprj), cd_type, psgn, pr2dj=jprj )
988         CASE DEFAULT   ;   CALL mpp_lbc_north_e( pt2d                    , cd_type, psgn               )
989         END SELECT
990         !
991      ENDIF
992
993      ! 2. East and west directions exchange
994      ! ------------------------------------
995      ! we play with the neigbours AND the row number because of the periodicity
996      !
997      SELECT CASE ( nbondi )      ! Read Dirichlet lateral conditions
998      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
999         iihom = nlci-nreci-jpri
1000         DO jl = 1, ipreci
1001            r2dew(:,jl,1) = pt2d(jpreci+jl,:)
1002            r2dwe(:,jl,1) = pt2d(iihom +jl,:)
1003         END DO
1004      END SELECT
1005      !
1006      !                           ! Migrations
1007      imigr = ipreci * ( jpj + 2*jprj)
1008      !
1009      SELECT CASE ( nbondi )
1010      CASE ( -1 )
1011         CALL mppsend( 2, r2dwe(1-jprj,1,1), imigr, noea, ml_req1 )
1012         CALL mpprecv( 1, r2dew(1-jprj,1,2), imigr, noea )
1013         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1014      CASE ( 0 )
1015         CALL mppsend( 1, r2dew(1-jprj,1,1), imigr, nowe, ml_req1 )
1016         CALL mppsend( 2, r2dwe(1-jprj,1,1), imigr, noea, ml_req2 )
1017         CALL mpprecv( 1, r2dew(1-jprj,1,2), imigr, noea )
1018         CALL mpprecv( 2, r2dwe(1-jprj,1,2), imigr, nowe )
1019         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1020         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1021      CASE ( 1 )
1022         CALL mppsend( 1, r2dew(1-jprj,1,1), imigr, nowe, ml_req1 )
1023         CALL mpprecv( 2, r2dwe(1-jprj,1,2), imigr, nowe )
1024         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1025      END SELECT
1026      !
1027      !                           ! Write Dirichlet lateral conditions
1028      iihom = nlci - jpreci
1029      !
1030      SELECT CASE ( nbondi )
1031      CASE ( -1 )
1032         DO jl = 1, ipreci
1033            pt2d(iihom+jl,:) = r2dew(:,jl,2)
1034         END DO
1035      CASE ( 0 )
1036         DO jl = 1, ipreci
1037            pt2d(jl-jpri,:) = r2dwe(:,jl,2)
1038            pt2d( iihom+jl,:) = r2dew(:,jl,2)
1039         END DO
1040      CASE ( 1 )
1041         DO jl = 1, ipreci
1042            pt2d(jl-jpri,:) = r2dwe(:,jl,2)
1043         END DO
1044      END SELECT
1045
1046
1047      ! 3. North and south directions
1048      ! -----------------------------
1049      ! always closed : we play only with the neigbours
1050      !
1051      IF( nbondj /= 2 ) THEN      ! Read Dirichlet lateral conditions
1052         ijhom = nlcj-nrecj-jprj
1053         DO jl = 1, iprecj
1054            r2dsn(:,jl,1) = pt2d(:,ijhom +jl)
1055            r2dns(:,jl,1) = pt2d(:,jprecj+jl)
1056         END DO
1057      ENDIF
1058      !
1059      !                           ! Migrations
1060      imigr = iprecj * ( jpi + 2*jpri )
1061      !
1062      SELECT CASE ( nbondj )
1063      CASE ( -1 )
1064         CALL mppsend( 4, r2dsn(1-jpri,1,1), imigr, nono, ml_req1 )
1065         CALL mpprecv( 3, r2dns(1-jpri,1,2), imigr, nono )
1066         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1067      CASE ( 0 )
1068         CALL mppsend( 3, r2dns(1-jpri,1,1), imigr, noso, ml_req1 )
1069         CALL mppsend( 4, r2dsn(1-jpri,1,1), imigr, nono, ml_req2 )
1070         CALL mpprecv( 3, r2dns(1-jpri,1,2), imigr, nono )
1071         CALL mpprecv( 4, r2dsn(1-jpri,1,2), imigr, noso )
1072         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1073         IF(l_isend) CALL mpi_wait(ml_req2,ml_stat,ml_err)
1074      CASE ( 1 )
1075         CALL mppsend( 3, r2dns(1-jpri,1,1), imigr, noso, ml_req1 )
1076         CALL mpprecv( 4, r2dsn(1-jpri,1,2), imigr, noso )
1077         IF(l_isend) CALL mpi_wait(ml_req1,ml_stat,ml_err)
1078      END SELECT
1079      !
1080      !                           ! Write Dirichlet lateral conditions
1081      ijhom = nlcj - jprecj
1082      !
1083      SELECT CASE ( nbondj )
1084      CASE ( -1 )
1085         DO jl = 1, iprecj
1086            pt2d(:,ijhom+jl) = r2dns(:,jl,2)
1087         END DO
1088      CASE ( 0 )
1089         DO jl = 1, iprecj
1090            pt2d(:,jl-jprj) = r2dsn(:,jl,2)
1091            pt2d(:,ijhom+jl ) = r2dns(:,jl,2)
1092         END DO
1093      CASE ( 1 )
1094         DO jl = 1, iprecj
1095            pt2d(:,jl-jprj) = r2dsn(:,jl,2)
1096         END DO
1097      END SELECT
1098
1099   END SUBROUTINE mpp_lnk_2d_e
1100
1101
1102   SUBROUTINE mppsend( ktyp, pmess, kbytes, kdest, md_req )
1103      !!----------------------------------------------------------------------
1104      !!                  ***  routine mppsend  ***
1105      !!
1106      !! ** Purpose :   Send messag passing array
1107      !!
1108      !!----------------------------------------------------------------------
1109      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
1110      INTEGER , INTENT(in   ) ::   kbytes     ! size of the array pmess
1111      INTEGER , INTENT(in   ) ::   kdest      ! receive process number
1112      INTEGER , INTENT(in   ) ::   ktyp       ! tag of the message
1113      INTEGER , INTENT(in   ) ::   md_req     ! argument for isend
1114      !!
1115      INTEGER ::   iflag
1116      !!----------------------------------------------------------------------
1117      !
1118      SELECT CASE ( cn_mpi_send )
1119      CASE ( 'S' )                ! Standard mpi send (blocking)
1120         CALL mpi_send ( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_opa        , iflag )
1121      CASE ( 'B' )                ! Buffer mpi send (blocking)
1122         CALL mpi_bsend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_opa        , iflag )
1123      CASE ( 'I' )                ! Immediate mpi send (non-blocking send)
1124         ! be carefull, one more argument here : the mpi request identifier..
1125         CALL mpi_isend( pmess, kbytes, mpi_double_precision, kdest , ktyp, mpi_comm_opa, md_req, iflag )
1126      END SELECT
1127      !
1128   END SUBROUTINE mppsend
1129
1130
1131   SUBROUTINE mpprecv( ktyp, pmess, kbytes, ksource )
1132      !!----------------------------------------------------------------------
1133      !!                  ***  routine mpprecv  ***
1134      !!
1135      !! ** Purpose :   Receive messag passing array
1136      !!
1137      !!----------------------------------------------------------------------
1138      REAL(wp), INTENT(inout) ::   pmess(*)   ! array of real
1139      INTEGER , INTENT(in   ) ::   kbytes     ! suze of the array pmess
1140      INTEGER , INTENT(in   ) ::   ktyp       ! Tag of the recevied message
1141      INTEGER, OPTIONAL, INTENT(in) :: ksource    ! source process number
1142      !!
1143      INTEGER :: istatus(mpi_status_size)
1144      INTEGER :: iflag
1145      INTEGER :: use_source
1146      !!----------------------------------------------------------------------
1147      !
1148
1149      ! If a specific process number has been passed to the receive call,
1150      ! use that one. Default is to use mpi_any_source
1151      use_source=mpi_any_source
1152      if(present(ksource)) then
1153         use_source=ksource
1154      end if
1155
1156      CALL mpi_recv( pmess, kbytes, mpi_double_precision, use_source, ktyp, mpi_comm_opa, istatus, iflag )
1157      !
1158   END SUBROUTINE mpprecv
1159
1160
1161   SUBROUTINE mppgather( ptab, kp, pio )
1162      !!----------------------------------------------------------------------
1163      !!                   ***  routine mppgather  ***
1164      !!
1165      !! ** Purpose :   Transfert between a local subdomain array and a work
1166      !!     array which is distributed following the vertical level.
1167      !!
1168      !!----------------------------------------------------------------------
1169      REAL(wp), DIMENSION(jpi,jpj),       INTENT(in   ) ::   ptab   ! subdomain input array
1170      INTEGER ,                           INTENT(in   ) ::   kp     ! record length
1171      REAL(wp), DIMENSION(jpi,jpj,jpnij), INTENT(  out) ::   pio    ! subdomain input array
1172      !!
1173      INTEGER :: itaille, ierror   ! temporary integer
1174      !!---------------------------------------------------------------------
1175      !
1176      itaille = jpi * jpj
1177      CALL mpi_gather( ptab, itaille, mpi_double_precision, pio, itaille     ,   &
1178         &                            mpi_double_precision, kp , mpi_comm_opa, ierror )
1179      !
1180   END SUBROUTINE mppgather
1181
1182
1183   SUBROUTINE mppscatter( pio, kp, ptab )
1184      !!----------------------------------------------------------------------
1185      !!                  ***  routine mppscatter  ***
1186      !!
1187      !! ** Purpose :   Transfert between awork array which is distributed
1188      !!      following the vertical level and the local subdomain array.
1189      !!
1190      !!----------------------------------------------------------------------
1191      REAL(wp), DIMENSION(jpi,jpj,jpnij)  ::  pio        ! output array
1192      INTEGER                             ::   kp        ! Tag (not used with MPI
1193      REAL(wp), DIMENSION(jpi,jpj)        ::  ptab       ! subdomain array input
1194      !!
1195      INTEGER :: itaille, ierror   ! temporary integer
1196      !!---------------------------------------------------------------------
1197      !
1198      itaille=jpi*jpj
1199      !
1200      CALL mpi_scatter( pio, itaille, mpi_double_precision, ptab, itaille     ,   &
1201         &                            mpi_double_precision, kp  , mpi_comm_opa, ierror )
1202      !
1203   END SUBROUTINE mppscatter
1204
1205
1206   SUBROUTINE mppmax_a_int( ktab, kdim, kcom )
1207      !!----------------------------------------------------------------------
1208      !!                  ***  routine mppmax_a_int  ***
1209      !!
1210      !! ** Purpose :   Find maximum value in an integer layout array
1211      !!
1212      !!----------------------------------------------------------------------
1213      INTEGER , INTENT(in   )                  ::   kdim   ! size of array
1214      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab   ! input array
1215      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom   !
1216      !!
1217      INTEGER :: ierror, localcomm   ! temporary integer
1218      INTEGER, DIMENSION(kdim) ::   iwork
1219      !!----------------------------------------------------------------------
1220      !
1221      localcomm = mpi_comm_opa
1222      IF( PRESENT(kcom) )   localcomm = kcom
1223      !
1224      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_max, localcomm, ierror )
1225      !
1226      ktab(:) = iwork(:)
1227      !
1228   END SUBROUTINE mppmax_a_int
1229
1230
1231   SUBROUTINE mppmax_int( ktab, kcom )
1232      !!----------------------------------------------------------------------
1233      !!                  ***  routine mppmax_int  ***
1234      !!
1235      !! ** Purpose :   Find maximum value in an integer layout array
1236      !!
1237      !!----------------------------------------------------------------------
1238      INTEGER, INTENT(inout)           ::   ktab      ! ???
1239      INTEGER, INTENT(in   ), OPTIONAL ::   kcom      ! ???
1240      !!
1241      INTEGER ::   ierror, iwork, localcomm   ! temporary integer
1242      !!----------------------------------------------------------------------
1243      !
1244      localcomm = mpi_comm_opa
1245      IF( PRESENT(kcom) )   localcomm = kcom
1246      !
1247      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_max, localcomm, ierror)
1248      !
1249      ktab = iwork
1250      !
1251   END SUBROUTINE mppmax_int
1252
1253
1254   SUBROUTINE mppmin_a_int( ktab, kdim, kcom )
1255      !!----------------------------------------------------------------------
1256      !!                  ***  routine mppmin_a_int  ***
1257      !!
1258      !! ** Purpose :   Find minimum value in an integer layout array
1259      !!
1260      !!----------------------------------------------------------------------
1261      INTEGER , INTENT( in  )                  ::   kdim        ! size of array
1262      INTEGER , INTENT(inout), DIMENSION(kdim) ::   ktab        ! input array
1263      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom        ! input array
1264      !!
1265      INTEGER ::   ierror, localcomm   ! temporary integer
1266      INTEGER, DIMENSION(kdim) ::   iwork
1267      !!----------------------------------------------------------------------
1268      !
1269      localcomm = mpi_comm_opa
1270      IF( PRESENT(kcom) )   localcomm = kcom
1271      !
1272      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_min, localcomm, ierror )
1273      !
1274      ktab(:) = iwork(:)
1275      !
1276   END SUBROUTINE mppmin_a_int
1277
1278
1279   SUBROUTINE mppmin_int( ktab, kcom )
1280      !!----------------------------------------------------------------------
1281      !!                  ***  routine mppmin_int  ***
1282      !!
1283      !! ** Purpose :   Find minimum value in an integer layout array
1284      !!
1285      !!----------------------------------------------------------------------
1286      INTEGER, INTENT(inout) ::   ktab      ! ???
1287      INTEGER , INTENT( in  ), OPTIONAL        ::   kcom        ! input array
1288      !!
1289      INTEGER ::  ierror, iwork, localcomm
1290      !!----------------------------------------------------------------------
1291      !
1292      localcomm = mpi_comm_opa
1293      IF( PRESENT(kcom) )   localcomm = kcom
1294      !
1295     CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_min, localcomm, ierror )
1296      !
1297      ktab = iwork
1298      !
1299   END SUBROUTINE mppmin_int
1300
1301
1302   SUBROUTINE mppsum_a_int( ktab, kdim )
1303      !!----------------------------------------------------------------------
1304      !!                  ***  routine mppsum_a_int  ***
1305      !!
1306      !! ** Purpose :   Global integer sum, 1D array case
1307      !!
1308      !!----------------------------------------------------------------------
1309      INTEGER, INTENT(in   )                   ::   kdim      ! ???
1310      INTEGER, INTENT(inout), DIMENSION (kdim) ::   ktab      ! ???
1311      !!
1312      INTEGER :: ierror
1313      INTEGER, DIMENSION (kdim) ::  iwork
1314      !!----------------------------------------------------------------------
1315      !
1316      CALL mpi_allreduce( ktab, iwork, kdim, mpi_integer, mpi_sum, mpi_comm_opa, ierror )
1317      !
1318      ktab(:) = iwork(:)
1319      !
1320   END SUBROUTINE mppsum_a_int
1321
1322
1323   SUBROUTINE mppsum_int( ktab )
1324      !!----------------------------------------------------------------------
1325      !!                 ***  routine mppsum_int  ***
1326      !!
1327      !! ** Purpose :   Global integer sum
1328      !!
1329      !!----------------------------------------------------------------------
1330      INTEGER, INTENT(inout) ::   ktab
1331      !!
1332      INTEGER :: ierror, iwork
1333      !!----------------------------------------------------------------------
1334      !
1335      CALL mpi_allreduce( ktab, iwork, 1, mpi_integer, mpi_sum, mpi_comm_opa, ierror )
1336      !
1337      ktab = iwork
1338      !
1339   END SUBROUTINE mppsum_int
1340
1341
1342   SUBROUTINE mppmax_a_real( ptab, kdim, kcom )
1343      !!----------------------------------------------------------------------
1344      !!                 ***  routine mppmax_a_real  ***
1345      !!
1346      !! ** Purpose :   Maximum
1347      !!
1348      !!----------------------------------------------------------------------
1349      INTEGER , INTENT(in   )                  ::   kdim
1350      REAL(wp), INTENT(inout), DIMENSION(kdim) ::   ptab
1351      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom
1352      !!
1353      INTEGER :: ierror, localcomm
1354      REAL(wp), DIMENSION(kdim) ::  zwork
1355      !!----------------------------------------------------------------------
1356      !
1357      localcomm = mpi_comm_opa
1358      IF( PRESENT(kcom) ) localcomm = kcom
1359      !
1360      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_max, localcomm, ierror )
1361      ptab(:) = zwork(:)
1362      !
1363   END SUBROUTINE mppmax_a_real
1364
1365
1366   SUBROUTINE mppmax_real( ptab, kcom )
1367      !!----------------------------------------------------------------------
1368      !!                  ***  routine mppmax_real  ***
1369      !!
1370      !! ** Purpose :   Maximum
1371      !!
1372      !!----------------------------------------------------------------------
1373      REAL(wp), INTENT(inout)           ::   ptab   ! ???
1374      INTEGER , INTENT(in   ), OPTIONAL ::   kcom   ! ???
1375      !!
1376      INTEGER  ::   ierror, localcomm
1377      REAL(wp) ::   zwork
1378      !!----------------------------------------------------------------------
1379      !
1380      localcomm = mpi_comm_opa
1381      IF( PRESENT(kcom) )   localcomm = kcom
1382      !
1383      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_max, localcomm, ierror )
1384      ptab = zwork
1385      !
1386   END SUBROUTINE mppmax_real
1387
1388
1389   SUBROUTINE mppmin_a_real( ptab, kdim, kcom )
1390      !!----------------------------------------------------------------------
1391      !!                 ***  routine mppmin_a_real  ***
1392      !!
1393      !! ** Purpose :   Minimum of REAL, array case
1394      !!
1395      !!-----------------------------------------------------------------------
1396      INTEGER , INTENT(in   )                  ::   kdim
1397      REAL(wp), INTENT(inout), DIMENSION(kdim) ::   ptab
1398      INTEGER , INTENT(in   ), OPTIONAL        ::   kcom
1399      !!
1400      INTEGER :: ierror, localcomm
1401      REAL(wp), DIMENSION(kdim) ::   zwork
1402      !!-----------------------------------------------------------------------
1403      !
1404      localcomm = mpi_comm_opa
1405      IF( PRESENT(kcom) ) localcomm = kcom
1406      !
1407      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_min, localcomm, ierror )
1408      ptab(:) = zwork(:)
1409      !
1410   END SUBROUTINE mppmin_a_real
1411
1412
1413   SUBROUTINE mppmin_real( ptab, kcom )
1414      !!----------------------------------------------------------------------
1415      !!                  ***  routine mppmin_real  ***
1416      !!
1417      !! ** Purpose :   minimum of REAL, scalar case
1418      !!
1419      !!-----------------------------------------------------------------------
1420      REAL(wp), INTENT(inout)           ::   ptab        !
1421      INTEGER , INTENT(in   ), OPTIONAL :: kcom
1422      !!
1423      INTEGER  ::   ierror
1424      REAL(wp) ::   zwork
1425      INTEGER :: localcomm
1426      !!-----------------------------------------------------------------------
1427      !
1428      localcomm = mpi_comm_opa
1429      IF( PRESENT(kcom) )   localcomm = kcom
1430      !
1431      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_min, localcomm, ierror )
1432      ptab = zwork
1433      !
1434   END SUBROUTINE mppmin_real
1435
1436
1437   SUBROUTINE mppsum_a_real( ptab, kdim, kcom )
1438      !!----------------------------------------------------------------------
1439      !!                  ***  routine mppsum_a_real  ***
1440      !!
1441      !! ** Purpose :   global sum, REAL ARRAY argument case
1442      !!
1443      !!-----------------------------------------------------------------------
1444      INTEGER , INTENT( in )                     ::   kdim      ! size of ptab
1445      REAL(wp), DIMENSION(kdim), INTENT( inout ) ::   ptab      ! input array
1446      INTEGER , INTENT( in ), OPTIONAL           :: kcom
1447      !!
1448      INTEGER                   ::   ierror    ! temporary integer
1449      INTEGER                   ::   localcomm
1450      REAL(wp), DIMENSION(kdim) ::   zwork     ! temporary workspace
1451      !!-----------------------------------------------------------------------
1452      !
1453      localcomm = mpi_comm_opa
1454      IF( PRESENT(kcom) )   localcomm = kcom
1455      !
1456      CALL mpi_allreduce( ptab, zwork, kdim, mpi_double_precision, mpi_sum, localcomm, ierror )
1457      ptab(:) = zwork(:)
1458      !
1459   END SUBROUTINE mppsum_a_real
1460
1461
1462   SUBROUTINE mppsum_real( ptab, kcom )
1463      !!----------------------------------------------------------------------
1464      !!                  ***  routine mppsum_real  ***
1465      !!
1466      !! ** Purpose :   global sum, SCALAR argument case
1467      !!
1468      !!-----------------------------------------------------------------------
1469      REAL(wp), INTENT(inout)           ::   ptab   ! input scalar
1470      INTEGER , INTENT(in   ), OPTIONAL ::   kcom
1471      !!
1472      INTEGER  ::   ierror, localcomm
1473      REAL(wp) ::   zwork
1474      !!-----------------------------------------------------------------------
1475      !
1476      localcomm = mpi_comm_opa
1477      IF( PRESENT(kcom) ) localcomm = kcom
1478      !
1479      CALL mpi_allreduce( ptab, zwork, 1, mpi_double_precision, mpi_sum, localcomm, ierror )
1480      ptab = zwork
1481      !
1482   END SUBROUTINE mppsum_real
1483
1484   SUBROUTINE mppsum_realdd( ytab, kcom )
1485      !!----------------------------------------------------------------------
1486      !!                  ***  routine mppsum_realdd ***
1487      !!
1488      !! ** Purpose :   global sum in Massively Parallel Processing
1489      !!                SCALAR argument case for double-double precision
1490      !!
1491      !!-----------------------------------------------------------------------
1492      COMPLEX(wp), INTENT(inout)         :: ytab    ! input scalar
1493      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1494
1495      !! * Local variables   (MPI version)
1496      INTEGER  ::    ierror
1497      INTEGER  ::   localcomm
1498      COMPLEX(wp) :: zwork
1499
1500      localcomm = mpi_comm_opa
1501      IF( PRESENT(kcom) ) localcomm = kcom
1502
1503      ! reduce local sums into global sum
1504      CALL MPI_ALLREDUCE (ytab, zwork, 1, MPI_DOUBLE_COMPLEX, &
1505                       MPI_SUMDD,localcomm,ierror)
1506      ytab = zwork
1507
1508   END SUBROUTINE mppsum_realdd
1509
1510
1511   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
1512      !!----------------------------------------------------------------------
1513      !!                  ***  routine mppsum_a_realdd  ***
1514      !!
1515      !! ** Purpose :   global sum in Massively Parallel Processing
1516      !!                COMPLEX ARRAY case for double-double precision
1517      !!
1518      !!-----------------------------------------------------------------------
1519      INTEGER , INTENT( in )                        ::   kdim      ! size of ytab
1520      COMPLEX(wp), DIMENSION(kdim), INTENT( inout ) ::   ytab      ! input array
1521      INTEGER , INTENT( in  ), OPTIONAL :: kcom
1522
1523      !! * Local variables   (MPI version)
1524      INTEGER                      :: ierror    ! temporary integer
1525      INTEGER                      ::   localcomm
1526      COMPLEX(wp), DIMENSION(kdim) :: zwork     ! temporary workspace
1527
1528      localcomm = mpi_comm_opa
1529      IF( PRESENT(kcom) ) localcomm = kcom
1530
1531      CALL MPI_ALLREDUCE (ytab, zwork, kdim, MPI_DOUBLE_COMPLEX, &
1532                       MPI_SUMDD,localcomm,ierror)
1533      ytab(:) = zwork(:)
1534
1535   END SUBROUTINE mppsum_a_realdd
1536
1537   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki,kj )
1538      !!------------------------------------------------------------------------
1539      !!             ***  routine mpp_minloc  ***
1540      !!
1541      !! ** Purpose :   Compute the global minimum of an array ptab
1542      !!              and also give its global position
1543      !!
1544      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
1545      !!
1546      !!--------------------------------------------------------------------------
1547      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab    ! Local 2D array
1548      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask   ! Local mask
1549      REAL(wp)                     , INTENT(  out) ::   pmin    ! Global minimum of ptab
1550      INTEGER                      , INTENT(  out) ::   ki, kj   ! index of minimum in global frame
1551      !!
1552      INTEGER , DIMENSION(2)   ::   ilocs
1553      INTEGER :: ierror
1554      REAL(wp) ::   zmin   ! local minimum
1555      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1556      !!-----------------------------------------------------------------------
1557      !
1558      zmin  = MINVAL( ptab(:,:) , mask= pmask == 1.e0 )
1559      ilocs = MINLOC( ptab(:,:) , mask= pmask == 1.e0 )
1560      !
1561      ki = ilocs(1) + nimpp - 1
1562      kj = ilocs(2) + njmpp - 1
1563      !
1564      zain(1,:)=zmin
1565      zain(2,:)=ki+10000.*kj
1566      !
1567      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OPA,ierror)
1568      !
1569      pmin = zaout(1,1)
1570      kj = INT(zaout(2,1)/10000.)
1571      ki = INT(zaout(2,1) - 10000.*kj )
1572      !
1573   END SUBROUTINE mpp_minloc2d
1574
1575
1576   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj ,kk)
1577      !!------------------------------------------------------------------------
1578      !!             ***  routine mpp_minloc  ***
1579      !!
1580      !! ** Purpose :   Compute the global minimum of an array ptab
1581      !!              and also give its global position
1582      !!
1583      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
1584      !!
1585      !!--------------------------------------------------------------------------
1586      REAL(wp), DIMENSION (jpi,jpj,jpk), INTENT(in   ) ::   ptab         ! Local 2D array
1587      REAL(wp), DIMENSION (jpi,jpj,jpk), INTENT(in   ) ::   pmask        ! Local mask
1588      REAL(wp)                         , INTENT(  out) ::   pmin         ! Global minimum of ptab
1589      INTEGER                          , INTENT(  out) ::   ki, kj, kk   ! index of minimum in global frame
1590      !!
1591      INTEGER  ::   ierror
1592      REAL(wp) ::   zmin     ! local minimum
1593      INTEGER , DIMENSION(3)   ::   ilocs
1594      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1595      !!-----------------------------------------------------------------------
1596      !
1597      zmin  = MINVAL( ptab(:,:,:) , mask= pmask == 1.e0 )
1598      ilocs = MINLOC( ptab(:,:,:) , mask= pmask == 1.e0 )
1599      !
1600      ki = ilocs(1) + nimpp - 1
1601      kj = ilocs(2) + njmpp - 1
1602      kk = ilocs(3)
1603      !
1604      zain(1,:)=zmin
1605      zain(2,:)=ki+10000.*kj+100000000.*kk
1606      !
1607      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MINLOC,MPI_COMM_OPA,ierror)
1608      !
1609      pmin = zaout(1,1)
1610      kk   = INT( zaout(2,1) / 100000000. )
1611      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
1612      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
1613      !
1614   END SUBROUTINE mpp_minloc3d
1615
1616
1617   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
1618      !!------------------------------------------------------------------------
1619      !!             ***  routine mpp_maxloc  ***
1620      !!
1621      !! ** Purpose :   Compute the global maximum of an array ptab
1622      !!              and also give its global position
1623      !!
1624      !! ** Method  :   Use MPI_ALLREDUCE with MPI_MINLOC
1625      !!
1626      !!--------------------------------------------------------------------------
1627      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   ptab     ! Local 2D array
1628      REAL(wp), DIMENSION (jpi,jpj), INTENT(in   ) ::   pmask    ! Local mask
1629      REAL(wp)                     , INTENT(  out) ::   pmax     ! Global maximum of ptab
1630      INTEGER                      , INTENT(  out) ::   ki, kj   ! index of maximum in global frame
1631      !!
1632      INTEGER  :: ierror
1633      INTEGER, DIMENSION (2)   ::   ilocs
1634      REAL(wp) :: zmax   ! local maximum
1635      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1636      !!-----------------------------------------------------------------------
1637      !
1638      zmax  = MAXVAL( ptab(:,:) , mask= pmask == 1.e0 )
1639      ilocs = MAXLOC( ptab(:,:) , mask= pmask == 1.e0 )
1640      !
1641      ki = ilocs(1) + nimpp - 1
1642      kj = ilocs(2) + njmpp - 1
1643      !
1644      zain(1,:) = zmax
1645      zain(2,:) = ki + 10000. * kj
1646      !
1647      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OPA,ierror)
1648      !
1649      pmax = zaout(1,1)
1650      kj   = INT( zaout(2,1) / 10000.     )
1651      ki   = INT( zaout(2,1) - 10000.* kj )
1652      !
1653   END SUBROUTINE mpp_maxloc2d
1654
1655
1656   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
1657      !!------------------------------------------------------------------------
1658      !!             ***  routine mpp_maxloc  ***
1659      !!
1660      !! ** Purpose :  Compute the global maximum of an array ptab
1661      !!              and also give its global position
1662      !!
1663      !! ** Method : Use MPI_ALLREDUCE with MPI_MINLOC
1664      !!
1665      !!--------------------------------------------------------------------------
1666      REAL(wp), DIMENSION (jpi,jpj,jpk), INTENT(in   ) ::   ptab         ! Local 2D array
1667      REAL(wp), DIMENSION (jpi,jpj,jpk), INTENT(in   ) ::   pmask        ! Local mask
1668      REAL(wp)                         , INTENT(  out) ::   pmax         ! Global maximum of ptab
1669      INTEGER                          , INTENT(  out) ::   ki, kj, kk   ! index of maximum in global frame
1670      !!
1671      REAL(wp) :: zmax   ! local maximum
1672      REAL(wp), DIMENSION(2,1) ::   zain, zaout
1673      INTEGER , DIMENSION(3)   ::   ilocs
1674      INTEGER :: ierror
1675      !!-----------------------------------------------------------------------
1676      !
1677      zmax  = MAXVAL( ptab(:,:,:) , mask= pmask == 1.e0 )
1678      ilocs = MAXLOC( ptab(:,:,:) , mask= pmask == 1.e0 )
1679      !
1680      ki = ilocs(1) + nimpp - 1
1681      kj = ilocs(2) + njmpp - 1
1682      kk = ilocs(3)
1683      !
1684      zain(1,:)=zmax
1685      zain(2,:)=ki+10000.*kj+100000000.*kk
1686      !
1687      CALL MPI_ALLREDUCE( zain,zaout, 1, MPI_2DOUBLE_PRECISION,MPI_MAXLOC,MPI_COMM_OPA,ierror)
1688      !
1689      pmax = zaout(1,1)
1690      kk   = INT( zaout(2,1) / 100000000. )
1691      kj   = INT( zaout(2,1) - kk * 100000000. ) / 10000
1692      ki   = INT( zaout(2,1) - kk * 100000000. -kj * 10000. )
1693      !
1694   END SUBROUTINE mpp_maxloc3d
1695
1696
1697   SUBROUTINE mppsync()
1698      !!----------------------------------------------------------------------
1699      !!                  ***  routine mppsync  ***
1700      !!
1701      !! ** Purpose :   Massively parallel processors, synchroneous
1702      !!
1703      !!-----------------------------------------------------------------------
1704      INTEGER :: ierror
1705      !!-----------------------------------------------------------------------
1706      !
1707      CALL mpi_barrier( mpi_comm_opa, ierror )
1708      !
1709   END SUBROUTINE mppsync
1710
1711
1712   SUBROUTINE mppstop
1713      !!----------------------------------------------------------------------
1714      !!                  ***  routine mppstop  ***
1715      !!
1716      !! ** purpose :   Stop massively parallel processors method
1717      !!
1718      !!----------------------------------------------------------------------
1719      INTEGER ::   info
1720      !!----------------------------------------------------------------------
1721      !
1722      CALL mppsync
1723      CALL mpi_finalize( info )
1724      !
1725   END SUBROUTINE mppstop
1726
1727
1728   SUBROUTINE mpp_comm_free( kcom )
1729      !!----------------------------------------------------------------------
1730      !!----------------------------------------------------------------------
1731      INTEGER, INTENT(in) ::   kcom
1732      !!
1733      INTEGER :: ierr
1734      !!----------------------------------------------------------------------
1735      !
1736      CALL MPI_COMM_FREE(kcom, ierr)
1737      !
1738   END SUBROUTINE mpp_comm_free
1739
1740
1741   SUBROUTINE mpp_ini_ice( pindic, kumout )
1742      !!----------------------------------------------------------------------
1743      !!               ***  routine mpp_ini_ice  ***
1744      !!
1745      !! ** Purpose :   Initialize special communicator for ice areas
1746      !!      condition together with global variables needed in the ddmpp folding
1747      !!
1748      !! ** Method  : - Look for ice processors in ice routines
1749      !!              - Put their number in nrank_ice
1750      !!              - Create groups for the world processors and the ice processors
1751      !!              - Create a communicator for ice processors
1752      !!
1753      !! ** output
1754      !!      njmppmax = njmpp for northern procs
1755      !!      ndim_rank_ice = number of processors with ice
1756      !!      nrank_ice (ndim_rank_ice) = ice processors
1757      !!      ngrp_iworld = group ID for the world processors
1758      !!      ngrp_ice = group ID for the ice processors
1759      !!      ncomm_ice = communicator for the ice procs.
1760      !!      n_ice_root = number (in the world) of proc 0 in the ice comm.
1761      !!
1762      !!----------------------------------------------------------------------
1763      INTEGER, INTENT(in) ::   pindic
1764      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical unit
1765      !!
1766      INTEGER :: jjproc
1767      INTEGER :: ii, ierr
1768      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kice
1769      INTEGER, ALLOCATABLE, DIMENSION(:) ::   zwork
1770      !!----------------------------------------------------------------------
1771      !
1772      ! Since this is just an init routine and these arrays are of length jpnij
1773      ! then don't use wrk_nemo module - just allocate and deallocate.
1774      ALLOCATE( kice(jpnij), zwork(jpnij), STAT=ierr )
1775      IF( ierr /= 0 ) THEN
1776         WRITE(kumout, cform_err)
1777         WRITE(kumout,*) 'mpp_ini_ice : failed to allocate 2, 1D arrays (jpnij in length)'
1778         CALL mppstop
1779      ENDIF
1780
1781      ! Look for how many procs with sea-ice
1782      !
1783      kice = 0
1784      DO jjproc = 1, jpnij
1785         IF( jjproc == narea .AND. pindic .GT. 0 )   kice(jjproc) = 1
1786      END DO
1787      !
1788      zwork = 0
1789      CALL MPI_ALLREDUCE( kice, zwork, jpnij, mpi_integer, mpi_sum, mpi_comm_opa, ierr )
1790      ndim_rank_ice = SUM( zwork )
1791
1792      ! Allocate the right size to nrank_north
1793      IF( ALLOCATED ( nrank_ice ) )   DEALLOCATE( nrank_ice )
1794      ALLOCATE( nrank_ice(ndim_rank_ice) )
1795      !
1796      ii = 0
1797      nrank_ice = 0
1798      DO jjproc = 1, jpnij
1799         IF( zwork(jjproc) == 1) THEN
1800            ii = ii + 1
1801            nrank_ice(ii) = jjproc -1
1802         ENDIF
1803      END DO
1804
1805      ! Create the world group
1806      CALL MPI_COMM_GROUP( mpi_comm_opa, ngrp_iworld, ierr )
1807
1808      ! Create the ice group from the world group
1809      CALL MPI_GROUP_INCL( ngrp_iworld, ndim_rank_ice, nrank_ice, ngrp_ice, ierr )
1810
1811      ! Create the ice communicator , ie the pool of procs with sea-ice
1812      CALL MPI_COMM_CREATE( mpi_comm_opa, ngrp_ice, ncomm_ice, ierr )
1813
1814      ! Find proc number in the world of proc 0 in the north
1815      ! The following line seems to be useless, we just comment & keep it as reminder
1816      ! CALL MPI_GROUP_TRANSLATE_RANKS(ngrp_ice,1,0,ngrp_iworld,n_ice_root,ierr)
1817      !
1818      CALL MPI_GROUP_FREE(ngrp_ice, ierr)
1819      CALL MPI_GROUP_FREE(ngrp_iworld, ierr)
1820
1821      DEALLOCATE(kice, zwork)
1822      !
1823   END SUBROUTINE mpp_ini_ice
1824
1825
1826   SUBROUTINE mpp_ini_znl( kumout )
1827      !!----------------------------------------------------------------------
1828      !!               ***  routine mpp_ini_znl  ***
1829      !!
1830      !! ** Purpose :   Initialize special communicator for computing zonal sum
1831      !!
1832      !! ** Method  : - Look for processors in the same row
1833      !!              - Put their number in nrank_znl
1834      !!              - Create group for the znl processors
1835      !!              - Create a communicator for znl processors
1836      !!              - Determine if processor should write znl files
1837      !!
1838      !! ** output
1839      !!      ndim_rank_znl = number of processors on the same row
1840      !!      ngrp_znl = group ID for the znl processors
1841      !!      ncomm_znl = communicator for the ice procs.
1842      !!      n_znl_root = number (in the world) of proc 0 in the ice comm.
1843      !!
1844      !!----------------------------------------------------------------------
1845      INTEGER, INTENT(in) ::   kumout   ! ocean.output logical units
1846      !
1847      INTEGER :: jproc      ! dummy loop integer
1848      INTEGER :: ierr, ii   ! local integer
1849      INTEGER, ALLOCATABLE, DIMENSION(:) ::   kwork
1850      !!----------------------------------------------------------------------
1851      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_world     : ', ngrp_world
1852      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_world : ', mpi_comm_world
1853      !-$$     WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - mpi_comm_opa   : ', mpi_comm_opa
1854      !
1855      ALLOCATE( kwork(jpnij), STAT=ierr )
1856      IF( ierr /= 0 ) THEN
1857         WRITE(kumout, cform_err)
1858         WRITE(kumout,*) 'mpp_ini_znl : failed to allocate 1D array of length jpnij'
1859         CALL mppstop
1860      ENDIF
1861
1862      IF( jpnj == 1 ) THEN
1863         ngrp_znl  = ngrp_world
1864         ncomm_znl = mpi_comm_opa
1865      ELSE
1866         !
1867         CALL MPI_ALLGATHER ( njmpp, 1, mpi_integer, kwork, 1, mpi_integer, mpi_comm_opa, ierr )
1868         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - kwork pour njmpp : ', kwork
1869         !-$$        CALL flush(numout)
1870         !
1871         ! Count number of processors on the same row
1872         ndim_rank_znl = 0
1873         DO jproc=1,jpnij
1874            IF ( kwork(jproc) == njmpp ) THEN
1875               ndim_rank_znl = ndim_rank_znl + 1
1876            ENDIF
1877         END DO
1878         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ndim_rank_znl : ', ndim_rank_znl
1879         !-$$        CALL flush(numout)
1880         ! Allocate the right size to nrank_znl
1881         IF (ALLOCATED (nrank_znl)) DEALLOCATE(nrank_znl)
1882         ALLOCATE(nrank_znl(ndim_rank_znl))
1883         ii = 0
1884         nrank_znl (:) = 0
1885         DO jproc=1,jpnij
1886            IF ( kwork(jproc) == njmpp) THEN
1887               ii = ii + 1
1888               nrank_znl(ii) = jproc -1
1889            ENDIF
1890         END DO
1891         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - nrank_znl : ', nrank_znl
1892         !-$$        CALL flush(numout)
1893
1894         ! Create the opa group
1895         CALL MPI_COMM_GROUP(mpi_comm_opa,ngrp_opa,ierr)
1896         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_opa : ', ngrp_opa
1897         !-$$        CALL flush(numout)
1898
1899         ! Create the znl group from the opa group
1900         CALL MPI_GROUP_INCL  ( ngrp_opa, ndim_rank_znl, nrank_znl, ngrp_znl, ierr )
1901         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ngrp_znl ', ngrp_znl
1902         !-$$        CALL flush(numout)
1903
1904         ! Create the znl communicator from the opa communicator, ie the pool of procs in the same row
1905         CALL MPI_COMM_CREATE ( mpi_comm_opa, ngrp_znl, ncomm_znl, ierr )
1906         !-$$        WRITE (numout,*) 'mpp_ini_znl ', nproc, ' - ncomm_znl ', ncomm_znl
1907         !-$$        CALL flush(numout)
1908         !
1909      END IF
1910
1911      ! Determines if processor if the first (starting from i=1) on the row
1912      IF ( jpni == 1 ) THEN
1913         l_znl_root = .TRUE.
1914      ELSE
1915         l_znl_root = .FALSE.
1916         kwork (1) = nimpp
1917         CALL mpp_min ( kwork(1), kcom = ncomm_znl)
1918         IF ( nimpp == kwork(1)) l_znl_root = .TRUE.
1919      END IF
1920
1921      DEALLOCATE(kwork)
1922
1923   END SUBROUTINE mpp_ini_znl
1924
1925
1926   SUBROUTINE mpp_ini_north
1927      !!----------------------------------------------------------------------
1928      !!               ***  routine mpp_ini_north  ***
1929      !!
1930      !! ** Purpose :   Initialize special communicator for north folding
1931      !!      condition together with global variables needed in the mpp folding
1932      !!
1933      !! ** Method  : - Look for northern processors
1934      !!              - Put their number in nrank_north
1935      !!              - Create groups for the world processors and the north processors
1936      !!              - Create a communicator for northern processors
1937      !!
1938      !! ** output
1939      !!      njmppmax = njmpp for northern procs
1940      !!      ndim_rank_north = number of processors in the northern line
1941      !!      nrank_north (ndim_rank_north) = number  of the northern procs.
1942      !!      ngrp_world = group ID for the world processors
1943      !!      ngrp_north = group ID for the northern processors
1944      !!      ncomm_north = communicator for the northern procs.
1945      !!      north_root = number (in the world) of proc 0 in the northern comm.
1946      !!
1947      !!----------------------------------------------------------------------
1948      INTEGER ::   ierr
1949      INTEGER ::   jjproc
1950      INTEGER ::   ii, ji
1951      !!----------------------------------------------------------------------
1952      !
1953      njmppmax = MAXVAL( njmppt )
1954      !
1955      ! Look for how many procs on the northern boundary
1956      ndim_rank_north = 0
1957      DO jjproc = 1, jpnij
1958         IF( njmppt(jjproc) == njmppmax )   ndim_rank_north = ndim_rank_north + 1
1959      END DO
1960      !
1961      ! Allocate the right size to nrank_north
1962      IF (ALLOCATED (nrank_north)) DEALLOCATE(nrank_north)
1963      ALLOCATE( nrank_north(ndim_rank_north) )
1964
1965      ! Fill the nrank_north array with proc. number of northern procs.
1966      ! Note : the rank start at 0 in MPI
1967      ii = 0
1968      DO ji = 1, jpnij
1969         IF ( njmppt(ji) == njmppmax   ) THEN
1970            ii=ii+1
1971            nrank_north(ii)=ji-1
1972         END IF
1973      END DO
1974      !
1975      ! create the world group
1976      CALL MPI_COMM_GROUP( mpi_comm_opa, ngrp_world, ierr )
1977      !
1978      ! Create the North group from the world group
1979      CALL MPI_GROUP_INCL( ngrp_world, ndim_rank_north, nrank_north, ngrp_north, ierr )
1980      !
1981      ! Create the North communicator , ie the pool of procs in the north group
1982      CALL MPI_COMM_CREATE( mpi_comm_opa, ngrp_north, ncomm_north, ierr )
1983      !
1984   END SUBROUTINE mpp_ini_north
1985
1986
1987   SUBROUTINE mpp_lbc_north_3d( pt3d, cd_type, psgn )
1988      !!---------------------------------------------------------------------
1989      !!                   ***  routine mpp_lbc_north_3d  ***
1990      !!
1991      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
1992      !!              in mpp configuration in case of jpn1 > 1
1993      !!
1994      !! ** Method  :   North fold condition and mpp with more than one proc
1995      !!              in i-direction require a specific treatment. We gather
1996      !!              the 4 northern lines of the global domain on 1 processor
1997      !!              and apply lbc north-fold on this sub array. Then we
1998      !!              scatter the north fold array back to the processors.
1999      !!
2000      !!----------------------------------------------------------------------
2001      REAL(wp), DIMENSION(jpi,jpj,jpk), INTENT(inout) ::   pt3d      ! 3D array on which the b.c. is applied
2002      CHARACTER(len=1)                , INTENT(in   ) ::   cd_type   ! nature of pt3d grid-points
2003      !                                                              !   = T ,  U , V , F or W  gridpoints
2004      REAL(wp)                        , INTENT(in   ) ::   psgn      ! = -1. the sign change across the north fold
2005      !!                                                             ! =  1. , the sign is kept
2006      INTEGER ::   ji, jj, jr, jk
2007      INTEGER ::   ierr, itaille, ildi, ilei, iilb
2008      INTEGER ::   ijpj, ijpjm1, ij, iproc
2009      INTEGER, DIMENSION (jpmaxngh)          ::   ml_req_nf          !for mpi_isend when avoiding mpi_allgather
2010      INTEGER                                ::   ml_err             ! for mpi_isend when avoiding mpi_allgather
2011      INTEGER, DIMENSION(MPI_STATUS_SIZE)    ::   ml_stat            ! for mpi_isend when avoiding mpi_allgather
2012      !                                                              ! Workspace for message transfers avoiding mpi_allgather
2013      REAL(wp), DIMENSION(:,:,:)  , ALLOCATABLE   :: ztab
2014      REAL(wp), DIMENSION(:,:,:)  , ALLOCATABLE   :: znorthloc, zfoldwk     
2015      REAL(wp), DIMENSION(:,:,:,:), ALLOCATABLE   :: znorthgloio
2016      REAL(wp), DIMENSION(:,:,:)  , ALLOCATABLE   :: ztabl, ztabr
2017
2018      INTEGER :: istatus(mpi_status_size)
2019      INTEGER :: iflag
2020      !!----------------------------------------------------------------------
2021      !
2022      ALLOCATE( ztab(jpiglo,4,jpk) , znorthloc(jpi,4,jpk), zfoldwk(jpi,4,jpk), znorthgloio(jpi,4,jpk,jpni) )
2023      ALLOCATE( ztabl(jpi,4,jpk), ztabr(jpi*jpmaxngh, 4, jpk) ) 
2024
2025      ijpj   = 4
2026      ijpjm1 = 3
2027      !
2028      DO jk = 1, jpk
2029         DO jj = nlcj - ijpj +1, nlcj          ! put in xnorthloc the last 4 jlines of pt3d
2030            ij = jj - nlcj + ijpj
2031            znorthloc(:,ij,jk) = pt3d(:,jj,jk)
2032         END DO
2033      END DO
2034      !
2035      !                                     ! Build in procs of ncomm_north the znorthgloio
2036      itaille = jpi * jpk * ijpj
2037
2038
2039      IF ( l_north_nogather ) THEN
2040         !
2041        ztabr(:,:,:) = 0
2042        DO jk = 1, jpk
2043           DO jj = nlcj-ijpj+1, nlcj          ! First put local values into the global array
2044              ij = jj - nlcj + ijpj
2045              DO ji = 1, nlci
2046                 ztabl(ji,ij,jk) = pt3d(ji,jj,jk)
2047              END DO
2048           END DO
2049        END DO
2050
2051         DO jr = 1,nsndto
2052            IF (isendto(jr) .ne. narea) CALL mppsend( 5, znorthloc, itaille, isendto(jr)-1, ml_req_nf(jr) )
2053         END DO
2054         DO jr = 1,nsndto
2055            iproc = isendto(jr)
2056            ildi = nldit (iproc)
2057            ilei = nleit (iproc)
2058            iilb = nimppt(isendto(jr)) - nimppt(isendto(1))
2059            IF(isendto(jr) .ne. narea) THEN
2060              CALL mpprecv(5, zfoldwk, itaille, isendto(jr)-1)
2061              DO jk = 1, jpk
2062                 DO jj = 1, ijpj
2063                    DO ji = 1, ilei
2064                       ztabr(iilb+ji,jj,jk) = zfoldwk(ji,jj,jk)
2065                    END DO
2066                 END DO
2067              END DO
2068           ELSE
2069              DO jk = 1, jpk
2070                 DO jj = 1, ijpj
2071                    DO ji = 1, ilei
2072                       ztabr(iilb+ji,jj,jk) = pt3d(ji,nlcj-ijpj+jj,jk)
2073                    END DO
2074                 END DO
2075              END DO
2076           ENDIF
2077         END DO
2078         IF (l_isend) THEN
2079            DO jr = 1,nsndto
2080               IF (isendto(jr) .ne. narea) CALL mpi_wait(ml_req_nf(jr), ml_stat, ml_err)
2081            END DO
2082         ENDIF
2083         CALL mpp_lbc_nfd( ztabl, ztabr, cd_type, psgn )   ! North fold boundary condition
2084         !
2085         DO jk = 1, jpk
2086            DO jj = nlcj-ijpj+1, nlcj             ! Scatter back to pt3d
2087               ij = jj - nlcj + ijpj
2088               DO ji= 1, nlci
2089                  pt3d(ji,jj,jk) = ztabl(ji,ij,jk)
2090               END DO
2091            END DO
2092         END DO
2093         !
2094
2095      ELSE
2096         CALL MPI_ALLGATHER( znorthloc  , itaille, MPI_DOUBLE_PRECISION,                &
2097            &                znorthgloio, itaille, MPI_DOUBLE_PRECISION, ncomm_north, ierr )
2098         !
2099         ztab(:,:,:) = 0.e0
2100         DO jr = 1, ndim_rank_north         ! recover the global north array
2101            iproc = nrank_north(jr) + 1
2102            ildi  = nldit (iproc)
2103            ilei  = nleit (iproc)
2104            iilb  = nimppt(iproc)
2105            DO jk = 1, jpk
2106               DO jj = 1, ijpj
2107                  DO ji = ildi, ilei
2108                    ztab(ji+iilb-1,jj,jk) = znorthgloio(ji,jj,jk,jr)
2109                  END DO
2110               END DO
2111            END DO
2112         END DO
2113         CALL lbc_nfd( ztab, cd_type, psgn )   ! North fold boundary condition
2114         !
2115         DO jk = 1, jpk
2116            DO jj = nlcj-ijpj+1, nlcj             ! Scatter back to pt3d
2117               ij = jj - nlcj + ijpj
2118               DO ji= 1, nlci
2119                  pt3d(ji,jj,jk) = ztab(ji+nimpp-1,ij,jk)
2120               END DO
2121            END DO
2122         END DO
2123         !
2124      ENDIF
2125      !
2126      ! The ztab array has been either:
2127      !  a. Fully populated by the mpi_allgather operation or
2128      !  b. Had the active points for this domain and northern neighbours populated
2129      !     by peer to peer exchanges
2130      ! Either way the array may be folded by lbc_nfd and the result for the span of
2131      ! this domain will be identical.
2132      !
2133      DEALLOCATE( ztab, znorthloc, zfoldwk, znorthgloio )
2134      DEALLOCATE( ztabl, ztabr ) 
2135      !
2136   END SUBROUTINE mpp_lbc_north_3d
2137
2138
2139   SUBROUTINE mpp_lbc_north_2d( pt2d, cd_type, psgn)
2140      !!---------------------------------------------------------------------
2141      !!                   ***  routine mpp_lbc_north_2d  ***
2142      !!
2143      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
2144      !!              in mpp configuration in case of jpn1 > 1 (for 2d array )
2145      !!
2146      !! ** Method  :   North fold condition and mpp with more than one proc
2147      !!              in i-direction require a specific treatment. We gather
2148      !!              the 4 northern lines of the global domain on 1 processor
2149      !!              and apply lbc north-fold on this sub array. Then we
2150      !!              scatter the north fold array back to the processors.
2151      !!
2152      !!----------------------------------------------------------------------
2153      REAL(wp), DIMENSION(jpi,jpj), INTENT(inout) ::   pt2d      ! 2D array on which the b.c. is applied
2154      CHARACTER(len=1)            , INTENT(in   ) ::   cd_type   ! nature of pt2d grid-points
2155      !                                                          !   = T ,  U , V , F or W  gridpoints
2156      REAL(wp)                    , INTENT(in   ) ::   psgn      ! = -1. the sign change across the north fold
2157      !!                                                             ! =  1. , the sign is kept
2158      INTEGER ::   ji, jj, jr
2159      INTEGER ::   ierr, itaille, ildi, ilei, iilb
2160      INTEGER ::   ijpj, ijpjm1, ij, iproc
2161      INTEGER, DIMENSION (jpmaxngh)      ::   ml_req_nf          !for mpi_isend when avoiding mpi_allgather
2162      INTEGER                            ::   ml_err             ! for mpi_isend when avoiding mpi_allgather
2163      INTEGER, DIMENSION(MPI_STATUS_SIZE)::   ml_stat            ! for mpi_isend when avoiding mpi_allgather
2164      !                                                              ! Workspace for message transfers avoiding mpi_allgather
2165      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE   :: ztab
2166      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE   :: znorthloc, zfoldwk     
2167      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE   :: znorthgloio
2168      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE   :: ztabl, ztabr
2169      INTEGER :: istatus(mpi_status_size)
2170      INTEGER :: iflag
2171      !!----------------------------------------------------------------------
2172      !
2173      ALLOCATE( ztab(jpiglo,4), znorthloc(jpi,4), zfoldwk(jpi,4), znorthgloio(jpi,4,jpni) )
2174      ALLOCATE( ztabl(jpi,4), ztabr(jpi*jpmaxngh, 4) ) 
2175      !
2176      ijpj   = 4
2177      ijpjm1 = 3
2178      !
2179      DO jj = nlcj-ijpj+1, nlcj             ! put in znorthloc the last 4 jlines of pt2d
2180         ij = jj - nlcj + ijpj
2181         znorthloc(:,ij) = pt2d(:,jj)
2182      END DO
2183
2184      !                                     ! Build in procs of ncomm_north the znorthgloio
2185      itaille = jpi * ijpj
2186      IF ( l_north_nogather ) THEN
2187         !
2188         ! Avoid the use of mpi_allgather by exchanging only with the processes already identified
2189         ! (in nemo_northcomms) as being  involved in this process' northern boundary exchange
2190         !
2191         ztabr(:,:) = 0
2192         DO jj = nlcj-ijpj+1, nlcj          ! First put local values into the global array
2193            ij = jj - nlcj + ijpj
2194            DO ji = 1, nlci
2195               ztabl(ji,ij) = pt2d(ji,jj)
2196            END DO
2197         END DO
2198
2199         DO jr = 1,nsndto
2200            IF (isendto(jr) .ne. narea) CALL mppsend(5, znorthloc, itaille, isendto(jr)-1, ml_req_nf(jr))
2201         END DO
2202         DO jr = 1,nsndto
2203            iproc = isendto(jr)
2204            ildi = nldit (iproc)
2205            ilei = nleit (iproc)
2206            iilb = nimppt(isendto(jr)) - nimppt(isendto(1))
2207            IF(isendto(jr) .ne. narea) THEN
2208              CALL mpprecv(5, zfoldwk, itaille, isendto(jr)-1)
2209              DO jj = 1, ijpj
2210                 DO ji = 1, ilei
2211                    ztabr(iilb+ji,jj) = zfoldwk(ji,jj)
2212                 END DO
2213              END DO
2214            ELSE
2215              DO jj = 1, ijpj
2216                 DO ji = 1, ilei
2217                    ztabr(iilb+ji,jj) = pt2d(ji,nlcj-ijpj+jj)
2218                 END DO
2219              END DO
2220            ENDIF
2221         END DO
2222         IF (l_isend) THEN
2223            DO jr = 1,nsndto
2224               IF (isendto(jr) .ne. narea) CALL mpi_wait(ml_req_nf(jr), ml_stat, ml_err)
2225            END DO
2226         ENDIF
2227         CALL mpp_lbc_nfd( ztabl, ztabr, cd_type, psgn )   ! North fold boundary condition
2228         !
2229         DO jj = nlcj-ijpj+1, nlcj             ! Scatter back to pt2d
2230            ij = jj - nlcj + ijpj
2231            DO ji = 1, nlci
2232               pt2d(ji,jj) = ztabl(ji,ij)
2233            END DO
2234         END DO
2235         !
2236      ELSE
2237         CALL MPI_ALLGATHER( znorthloc  , itaille, MPI_DOUBLE_PRECISION,        &
2238            &                znorthgloio, itaille, MPI_DOUBLE_PRECISION, ncomm_north, ierr )
2239         !
2240         ztab(:,:) = 0.e0
2241         DO jr = 1, ndim_rank_north            ! recover the global north array
2242            iproc = nrank_north(jr) + 1
2243            ildi = nldit (iproc)
2244            ilei = nleit (iproc)
2245            iilb = nimppt(iproc)
2246            DO jj = 1, ijpj
2247               DO ji = ildi, ilei
2248                  ztab(ji+iilb-1,jj) = znorthgloio(ji,jj,jr)
2249               END DO
2250            END DO
2251         END DO
2252         CALL lbc_nfd( ztab, cd_type, psgn )   ! North fold boundary condition
2253         !
2254         DO jj = nlcj-ijpj+1, nlcj             ! Scatter back to pt2d
2255            ij = jj - nlcj + ijpj
2256            DO ji = 1, nlci
2257               pt2d(ji,jj) = ztab(ji+nimpp-1,ij)
2258            END DO
2259         END DO
2260         !
2261      ENDIF
2262      DEALLOCATE( ztab, znorthloc, zfoldwk, znorthgloio )
2263      DEALLOCATE( ztabl, ztabr ) 
2264      !
2265   END SUBROUTINE mpp_lbc_north_2d
2266
2267
2268   SUBROUTINE mpp_lbc_north_e( pt2d, cd_type, psgn)
2269      !!---------------------------------------------------------------------
2270      !!                   ***  routine mpp_lbc_north_2d  ***
2271      !!
2272      !! ** Purpose :   Ensure proper north fold horizontal bondary condition
2273      !!              in mpp configuration in case of jpn1 > 1 and for 2d
2274      !!              array with outer extra halo
2275      !!
2276      !! ** Method  :   North fold condition and mpp with more than one proc
2277      !!              in i-direction require a specific treatment. We gather
2278      !!              the 4+2*jpr2dj northern lines of the global domain on 1
2279      !!              processor and apply lbc north-fold on this sub array.
2280      !!              Then we scatter the north fold array back to the processors.
2281      !!
2282      !!----------------------------------------------------------------------
2283      REAL(wp), DIMENSION(1-jpr2di:jpi+jpr2di,1-jpr2dj:jpj+jpr2dj), INTENT(inout) ::   pt2d     ! 2D array with extra halo
2284      CHARACTER(len=1)                                            , INTENT(in   ) ::   cd_type  ! nature of pt3d grid-points
2285      !                                                                                         !   = T ,  U , V , F or W -points
2286      REAL(wp)                                                    , INTENT(in   ) ::   psgn     ! = -1. the sign change across the
2287      !!                                                                                        ! north fold, =  1. otherwise
2288      INTEGER ::   ji, jj, jr
2289      INTEGER ::   ierr, itaille, ildi, ilei, iilb
2290      INTEGER ::   ijpj, ij, iproc
2291      !
2292      REAL(wp), DIMENSION(:,:)  , ALLOCATABLE  ::  ztab_e, znorthloc_e
2293      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE  ::  znorthgloio_e
2294
2295      !!----------------------------------------------------------------------
2296      !
2297      ALLOCATE( ztab_e(jpiglo,4+2*jpr2dj), znorthloc_e(jpi,4+2*jpr2dj), znorthgloio_e(jpi,4+2*jpr2dj,jpni) )
2298
2299      !
2300      ijpj=4
2301      ztab_e(:,:) = 0.e0
2302
2303      ij=0
2304      ! put in znorthloc_e the last 4 jlines of pt2d
2305      DO jj = nlcj - ijpj + 1 - jpr2dj, nlcj +jpr2dj
2306         ij = ij + 1
2307         DO ji = 1, jpi
2308            znorthloc_e(ji,ij)=pt2d(ji,jj)
2309         END DO
2310      END DO
2311      !
2312      itaille = jpi * ( ijpj + 2 * jpr2dj )
2313      CALL MPI_ALLGATHER( znorthloc_e(1,1)  , itaille, MPI_DOUBLE_PRECISION,    &
2314         &                znorthgloio_e(1,1,1), itaille, MPI_DOUBLE_PRECISION, ncomm_north, ierr )
2315      !
2316      DO jr = 1, ndim_rank_north            ! recover the global north array
2317         iproc = nrank_north(jr) + 1
2318         ildi = nldit (iproc)
2319         ilei = nleit (iproc)
2320         iilb = nimppt(iproc)
2321         DO jj = 1, ijpj+2*jpr2dj
2322            DO ji = ildi, ilei
2323               ztab_e(ji+iilb-1,jj) = znorthgloio_e(ji,jj,jr)
2324            END DO
2325         END DO
2326      END DO
2327
2328
2329      ! 2. North-Fold boundary conditions
2330      ! ----------------------------------
2331      CALL lbc_nfd( ztab_e(:,:), cd_type, psgn, pr2dj = jpr2dj )
2332
2333      ij = jpr2dj
2334      !! Scatter back to pt2d
2335      DO jj = nlcj - ijpj + 1 , nlcj +jpr2dj
2336      ij  = ij +1
2337         DO ji= 1, nlci
2338            pt2d(ji,jj) = ztab_e(ji+nimpp-1,ij)
2339         END DO
2340      END DO
2341      !
2342      DEALLOCATE( ztab_e, znorthloc_e, znorthgloio_e )
2343      !
2344   END SUBROUTINE mpp_lbc_north_e
2345
2346      SUBROUTINE mpp_lnk_bdy_3d( ptab, cd_type, psgn, ib_bdy )
2347      !!----------------------------------------------------------------------
2348      !!                  ***  routine mpp_lnk_bdy_3d  ***
2349      !!
2350      !! ** Purpose :   Message passing management
2351      !!
2352      !! ** Method  :   Use mppsend and mpprecv function for passing BDY boundaries
2353      !!      between processors following neighboring subdomains.
2354      !!            domain parameters
2355      !!                    nlci   : first dimension of the local subdomain
2356      !!                    nlcj   : second dimension of the local subdomain
2357      !!                    nbondi_bdy : mark for "east-west local boundary"
2358      !!                    nbondj_bdy : mark for "north-south local boundary"
2359      !!                    noea   : number for local neighboring processors
2360      !!                    nowe   : number for local neighboring processors
2361      !!                    noso   : number for local neighboring processors
2362      !!                    nono   : number for local neighboring processors
2363      !!
2364      !! ** Action  :   ptab with update value at its periphery
2365      !!
2366      !!----------------------------------------------------------------------
2367
2368      USE lbcnfd          ! north fold
2369
2370      INCLUDE 'mpif.h'
2371
2372      REAL(wp), DIMENSION(jpi,jpj,jpk), INTENT(inout) ::   ptab     ! 3D array on which the boundary condition is applied
2373      CHARACTER(len=1)                , INTENT(in   ) ::   cd_type  ! define the nature of ptab array grid-points
2374      !                                                             ! = T , U , V , F , W points
2375      REAL(wp)                        , INTENT(in   ) ::   psgn     ! =-1 the sign change across the north fold boundary
2376      !                                                             ! =  1. , the sign is kept
2377      INTEGER                         , INTENT(in   ) ::   ib_bdy   ! BDY boundary set
2378      INTEGER  ::   ji, jj, jk, jl             ! dummy loop indices
2379      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
2380      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
2381      REAL(wp) ::   zland
2382      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
2383      !
2384      REAL(wp), DIMENSION(:,:,:,:), ALLOCATABLE ::   zt3ns, zt3sn   ! 3d for north-south & south-north
2385      REAL(wp), DIMENSION(:,:,:,:), ALLOCATABLE ::   zt3ew, zt3we   ! 3d for east-west & west-east
2386
2387      !!----------------------------------------------------------------------
2388     
2389      ALLOCATE( zt3ns(jpi,jprecj,jpk,2), zt3sn(jpi,jprecj,jpk,2),   &
2390         &      zt3ew(jpj,jpreci,jpk,2), zt3we(jpj,jpreci,jpk,2)  )
2391
2392      zland = 0.e0
2393
2394      ! 1. standard boundary treatment
2395      ! ------------------------------
2396     
2397      !                                   ! East-West boundaries
2398      !                                        !* Cyclic east-west
2399
2400      IF( nbondi == 2) THEN
2401        IF (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) THEN
2402          ptab( 1 ,:,:) = ptab(jpim1,:,:)
2403          ptab(jpi,:,:) = ptab(  2  ,:,:)
2404        ELSE
2405          IF( .NOT. cd_type == 'F' )   ptab(     1       :jpreci,:,:) = zland    ! south except F-point
2406          ptab(nlci-jpreci+1:jpi   ,:,:) = zland    ! north
2407        ENDIF
2408      ELSEIF(nbondi == -1) THEN
2409        IF( .NOT. cd_type == 'F' )   ptab(     1       :jpreci,:,:) = zland    ! south except F-point
2410      ELSEIF(nbondi == 1) THEN
2411        ptab(nlci-jpreci+1:jpi   ,:,:) = zland    ! north
2412      ENDIF                                     !* closed
2413
2414      IF (nbondj == 2 .OR. nbondj == -1) THEN
2415        IF( .NOT. cd_type == 'F' )   ptab(:,     1       :jprecj,:) = zland       ! south except F-point
2416      ELSEIF (nbondj == 2 .OR. nbondj == 1) THEN
2417        ptab(:,nlcj-jprecj+1:jpj   ,:) = zland       ! north
2418      ENDIF
2419     
2420      !
2421
2422      ! 2. East and west directions exchange
2423      ! ------------------------------------
2424      ! we play with the neigbours AND the row number because of the periodicity
2425      !
2426      SELECT CASE ( nbondi_bdy(ib_bdy) )      ! Read Dirichlet lateral conditions
2427      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
2428         iihom = nlci-nreci
2429         DO jl = 1, jpreci
2430            zt3ew(:,jl,:,1) = ptab(jpreci+jl,:,:)
2431            zt3we(:,jl,:,1) = ptab(iihom +jl,:,:)
2432         END DO
2433      END SELECT
2434      !
2435      !                           ! Migrations
2436      imigr = jpreci * jpj * jpk
2437      !
2438      SELECT CASE ( nbondi_bdy(ib_bdy) )
2439      CASE ( -1 )
2440         CALL mppsend( 2, zt3we(1,1,1,1), imigr, noea, ml_req1 )
2441      CASE ( 0 )
2442         CALL mppsend( 1, zt3ew(1,1,1,1), imigr, nowe, ml_req1 )
2443         CALL mppsend( 2, zt3we(1,1,1,1), imigr, noea, ml_req2 )
2444      CASE ( 1 )
2445         CALL mppsend( 1, zt3ew(1,1,1,1), imigr, nowe, ml_req1 )
2446      END SELECT
2447      !
2448      SELECT CASE ( nbondi_bdy_b(ib_bdy) )
2449      CASE ( -1 )
2450         CALL mpprecv( 1, zt3ew(1,1,1,2), imigr, noea )
2451      CASE ( 0 )
2452         CALL mpprecv( 1, zt3ew(1,1,1,2), imigr, noea )
2453         CALL mpprecv( 2, zt3we(1,1,1,2), imigr, nowe )
2454      CASE ( 1 )
2455         CALL mpprecv( 2, zt3we(1,1,1,2), imigr, nowe )
2456      END SELECT
2457      !
2458      SELECT CASE ( nbondi_bdy(ib_bdy) )
2459      CASE ( -1 )
2460         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2461      CASE ( 0 )
2462         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2463         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
2464      CASE ( 1 )
2465         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2466      END SELECT
2467      !
2468      !                           ! Write Dirichlet lateral conditions
2469      iihom = nlci-jpreci
2470      !
2471      SELECT CASE ( nbondi_bdy_b(ib_bdy) )
2472      CASE ( -1 )
2473         DO jl = 1, jpreci
2474            ptab(iihom+jl,:,:) = zt3ew(:,jl,:,2)
2475         END DO
2476      CASE ( 0 )
2477         DO jl = 1, jpreci
2478            ptab(jl      ,:,:) = zt3we(:,jl,:,2)
2479            ptab(iihom+jl,:,:) = zt3ew(:,jl,:,2)
2480         END DO
2481      CASE ( 1 )
2482         DO jl = 1, jpreci
2483            ptab(jl      ,:,:) = zt3we(:,jl,:,2)
2484         END DO
2485      END SELECT
2486
2487
2488      ! 3. North and south directions
2489      ! -----------------------------
2490      ! always closed : we play only with the neigbours
2491      !
2492      IF( nbondj_bdy(ib_bdy) /= 2 ) THEN      ! Read Dirichlet lateral conditions
2493         ijhom = nlcj-nrecj
2494         DO jl = 1, jprecj
2495            zt3sn(:,jl,:,1) = ptab(:,ijhom +jl,:)
2496            zt3ns(:,jl,:,1) = ptab(:,jprecj+jl,:)
2497         END DO
2498      ENDIF
2499      !
2500      !                           ! Migrations
2501      imigr = jprecj * jpi * jpk
2502      !
2503      SELECT CASE ( nbondj_bdy(ib_bdy) )
2504      CASE ( -1 )
2505         CALL mppsend( 4, zt3sn(1,1,1,1), imigr, nono, ml_req1 )
2506      CASE ( 0 )
2507         CALL mppsend( 3, zt3ns(1,1,1,1), imigr, noso, ml_req1 )
2508         CALL mppsend( 4, zt3sn(1,1,1,1), imigr, nono, ml_req2 )
2509      CASE ( 1 )
2510         CALL mppsend( 3, zt3ns(1,1,1,1), imigr, noso, ml_req1 )
2511      END SELECT
2512      !
2513      SELECT CASE ( nbondj_bdy_b(ib_bdy) )
2514      CASE ( -1 )
2515         CALL mpprecv( 3, zt3ns(1,1,1,2), imigr, nono )
2516      CASE ( 0 )
2517         CALL mpprecv( 3, zt3ns(1,1,1,2), imigr, nono )
2518         CALL mpprecv( 4, zt3sn(1,1,1,2), imigr, noso )
2519      CASE ( 1 )
2520         CALL mpprecv( 4, zt3sn(1,1,1,2), imigr, noso )
2521      END SELECT
2522      !
2523      SELECT CASE ( nbondj_bdy(ib_bdy) )
2524      CASE ( -1 )
2525         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2526      CASE ( 0 )
2527         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2528         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
2529      CASE ( 1 )
2530         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2531      END SELECT
2532      !
2533      !                           ! Write Dirichlet lateral conditions
2534      ijhom = nlcj-jprecj
2535      !
2536      SELECT CASE ( nbondj_bdy_b(ib_bdy) )
2537      CASE ( -1 )
2538         DO jl = 1, jprecj
2539            ptab(:,ijhom+jl,:) = zt3ns(:,jl,:,2)
2540         END DO
2541      CASE ( 0 )
2542         DO jl = 1, jprecj
2543            ptab(:,jl      ,:) = zt3sn(:,jl,:,2)
2544            ptab(:,ijhom+jl,:) = zt3ns(:,jl,:,2)
2545         END DO
2546      CASE ( 1 )
2547         DO jl = 1, jprecj
2548            ptab(:,jl,:) = zt3sn(:,jl,:,2)
2549         END DO
2550      END SELECT
2551
2552
2553      ! 4. north fold treatment
2554      ! -----------------------
2555      !
2556      IF( npolj /= 0) THEN
2557         !
2558         SELECT CASE ( jpni )
2559         CASE ( 1 )     ;   CALL lbc_nfd      ( ptab, cd_type, psgn )   ! only 1 northern proc, no mpp
2560         CASE DEFAULT   ;   CALL mpp_lbc_north( ptab, cd_type, psgn )   ! for all northern procs.
2561         END SELECT
2562         !
2563      ENDIF
2564      !
2565      DEALLOCATE( zt3ns, zt3sn, zt3ew, zt3we  )
2566      !
2567   END SUBROUTINE mpp_lnk_bdy_3d
2568
2569      SUBROUTINE mpp_lnk_bdy_2d( ptab, cd_type, psgn, ib_bdy )
2570      !!----------------------------------------------------------------------
2571      !!                  ***  routine mpp_lnk_bdy_2d  ***
2572      !!
2573      !! ** Purpose :   Message passing management
2574      !!
2575      !! ** Method  :   Use mppsend and mpprecv function for passing BDY boundaries
2576      !!      between processors following neighboring subdomains.
2577      !!            domain parameters
2578      !!                    nlci   : first dimension of the local subdomain
2579      !!                    nlcj   : second dimension of the local subdomain
2580      !!                    nbondi_bdy : mark for "east-west local boundary"
2581      !!                    nbondj_bdy : mark for "north-south local boundary"
2582      !!                    noea   : number for local neighboring processors
2583      !!                    nowe   : number for local neighboring processors
2584      !!                    noso   : number for local neighboring processors
2585      !!                    nono   : number for local neighboring processors
2586      !!
2587      !! ** Action  :   ptab with update value at its periphery
2588      !!
2589      !!----------------------------------------------------------------------
2590
2591      USE lbcnfd          ! north fold
2592
2593      INCLUDE 'mpif.h'
2594
2595      REAL(wp), DIMENSION(jpi,jpj)    , INTENT(inout) ::   ptab     ! 3D array on which the boundary condition is applied
2596      CHARACTER(len=1)                , INTENT(in   ) ::   cd_type  ! define the nature of ptab array grid-points
2597      !                                                             ! = T , U , V , F , W points
2598      REAL(wp)                        , INTENT(in   ) ::   psgn     ! =-1 the sign change across the north fold boundary
2599      !                                                             ! =  1. , the sign is kept
2600      INTEGER                         , INTENT(in   ) ::   ib_bdy   ! BDY boundary set
2601      INTEGER  ::   ji, jj, jl             ! dummy loop indices
2602      INTEGER  ::   imigr, iihom, ijhom        ! temporary integers
2603      INTEGER  ::   ml_req1, ml_req2, ml_err   ! for key_mpi_isend
2604      REAL(wp) ::   zland
2605      INTEGER, DIMENSION(MPI_STATUS_SIZE) ::   ml_stat   ! for key_mpi_isend
2606      !
2607      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE ::  zt2ns, zt2sn   ! 2d for north-south & south-north
2608      REAL(wp), DIMENSION(:,:,:), ALLOCATABLE ::  zt2ew, zt2we   ! 2d for east-west & west-east
2609
2610      !!----------------------------------------------------------------------
2611
2612      ALLOCATE( zt2ns(jpi,jprecj,2), zt2sn(jpi,jprecj,2),  &
2613         &      zt2ew(jpj,jpreci,2), zt2we(jpj,jpreci,2)   )
2614
2615      zland = 0.e0
2616
2617      ! 1. standard boundary treatment
2618      ! ------------------------------
2619     
2620      !                                   ! East-West boundaries
2621      !                                        !* Cyclic east-west
2622
2623      IF( nbondi == 2) THEN
2624        IF (nperio == 1 .OR. nperio == 4 .OR. nperio == 6) THEN
2625          ptab( 1 ,:) = ptab(jpim1,:)
2626          ptab(jpi,:) = ptab(  2  ,:)
2627        ELSE
2628          IF( .NOT. cd_type == 'F' )   ptab(     1       :jpreci,:) = zland    ! south except F-point
2629          ptab(nlci-jpreci+1:jpi   ,:) = zland    ! north
2630        ENDIF
2631      ELSEIF(nbondi == -1) THEN
2632        IF( .NOT. cd_type == 'F' )   ptab(     1       :jpreci,:) = zland    ! south except F-point
2633      ELSEIF(nbondi == 1) THEN
2634        ptab(nlci-jpreci+1:jpi   ,:) = zland    ! north
2635      ENDIF                                     !* closed
2636
2637      IF (nbondj == 2 .OR. nbondj == -1) THEN
2638        IF( .NOT. cd_type == 'F' )   ptab(:,     1       :jprecj) = zland       ! south except F-point
2639      ELSEIF (nbondj == 2 .OR. nbondj == 1) THEN
2640        ptab(:,nlcj-jprecj+1:jpj) = zland       ! north
2641      ENDIF
2642     
2643      !
2644
2645      ! 2. East and west directions exchange
2646      ! ------------------------------------
2647      ! we play with the neigbours AND the row number because of the periodicity
2648      !
2649      SELECT CASE ( nbondi_bdy(ib_bdy) )      ! Read Dirichlet lateral conditions
2650      CASE ( -1, 0, 1 )                ! all exept 2 (i.e. close case)
2651         iihom = nlci-nreci
2652         DO jl = 1, jpreci
2653            zt2ew(:,jl,1) = ptab(jpreci+jl,:)
2654            zt2we(:,jl,1) = ptab(iihom +jl,:)
2655         END DO
2656      END SELECT
2657      !
2658      !                           ! Migrations
2659      imigr = jpreci * jpj
2660      !
2661      SELECT CASE ( nbondi_bdy(ib_bdy) )
2662      CASE ( -1 )
2663         CALL mppsend( 2, zt2we(1,1,1), imigr, noea, ml_req1 )
2664      CASE ( 0 )
2665         CALL mppsend( 1, zt2ew(1,1,1), imigr, nowe, ml_req1 )
2666         CALL mppsend( 2, zt2we(1,1,1), imigr, noea, ml_req2 )
2667      CASE ( 1 )
2668         CALL mppsend( 1, zt2ew(1,1,1), imigr, nowe, ml_req1 )
2669      END SELECT
2670      !
2671      SELECT CASE ( nbondi_bdy_b(ib_bdy) )
2672      CASE ( -1 )
2673         CALL mpprecv( 1, zt2ew(1,1,2), imigr, noea )
2674      CASE ( 0 )
2675         CALL mpprecv( 1, zt2ew(1,1,2), imigr, noea )
2676         CALL mpprecv( 2, zt2we(1,1,2), imigr, nowe )
2677      CASE ( 1 )
2678         CALL mpprecv( 2, zt2we(1,1,2), imigr, nowe )
2679      END SELECT
2680      !
2681      SELECT CASE ( nbondi_bdy(ib_bdy) )
2682      CASE ( -1 )
2683         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2684      CASE ( 0 )
2685         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2686         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
2687      CASE ( 1 )
2688         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2689      END SELECT
2690      !
2691      !                           ! Write Dirichlet lateral conditions
2692      iihom = nlci-jpreci
2693      !
2694      SELECT CASE ( nbondi_bdy_b(ib_bdy) )
2695      CASE ( -1 )
2696         DO jl = 1, jpreci
2697            ptab(iihom+jl,:) = zt2ew(:,jl,2)
2698         END DO
2699      CASE ( 0 )
2700         DO jl = 1, jpreci
2701            ptab(jl      ,:) = zt2we(:,jl,2)
2702            ptab(iihom+jl,:) = zt2ew(:,jl,2)
2703         END DO
2704      CASE ( 1 )
2705         DO jl = 1, jpreci
2706            ptab(jl      ,:) = zt2we(:,jl,2)
2707         END DO
2708      END SELECT
2709
2710
2711      ! 3. North and south directions
2712      ! -----------------------------
2713      ! always closed : we play only with the neigbours
2714      !
2715      IF( nbondj_bdy(ib_bdy) /= 2 ) THEN      ! Read Dirichlet lateral conditions
2716         ijhom = nlcj-nrecj
2717         DO jl = 1, jprecj
2718            zt2sn(:,jl,1) = ptab(:,ijhom +jl)
2719            zt2ns(:,jl,1) = ptab(:,jprecj+jl)
2720         END DO
2721      ENDIF
2722      !
2723      !                           ! Migrations
2724      imigr = jprecj * jpi
2725      !
2726      SELECT CASE ( nbondj_bdy(ib_bdy) )
2727      CASE ( -1 )
2728         CALL mppsend( 4, zt2sn(1,1,1), imigr, nono, ml_req1 )
2729      CASE ( 0 )
2730         CALL mppsend( 3, zt2ns(1,1,1), imigr, noso, ml_req1 )
2731         CALL mppsend( 4, zt2sn(1,1,1), imigr, nono, ml_req2 )
2732      CASE ( 1 )
2733         CALL mppsend( 3, zt2ns(1,1,1), imigr, noso, ml_req1 )
2734      END SELECT
2735      !
2736      SELECT CASE ( nbondj_bdy_b(ib_bdy) )
2737      CASE ( -1 )
2738         CALL mpprecv( 3, zt2ns(1,1,2), imigr, nono )
2739      CASE ( 0 )
2740         CALL mpprecv( 3, zt2ns(1,1,2), imigr, nono )
2741         CALL mpprecv( 4, zt2sn(1,1,2), imigr, noso )
2742      CASE ( 1 )
2743         CALL mpprecv( 4, zt2sn(1,1,2), imigr, noso )
2744      END SELECT
2745      !
2746      SELECT CASE ( nbondj_bdy(ib_bdy) )
2747      CASE ( -1 )
2748         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2749      CASE ( 0 )
2750         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2751         IF(l_isend) CALL mpi_wait(ml_req2, ml_stat, ml_err)
2752      CASE ( 1 )
2753         IF(l_isend) CALL mpi_wait(ml_req1, ml_stat, ml_err)
2754      END SELECT
2755      !
2756      !                           ! Write Dirichlet lateral conditions
2757      ijhom = nlcj-jprecj
2758      !
2759      SELECT CASE ( nbondj_bdy_b(ib_bdy) )
2760      CASE ( -1 )
2761         DO jl = 1, jprecj
2762            ptab(:,ijhom+jl) = zt2ns(:,jl,2)
2763         END DO
2764      CASE ( 0 )
2765         DO jl = 1, jprecj
2766            ptab(:,jl      ) = zt2sn(:,jl,2)
2767            ptab(:,ijhom+jl) = zt2ns(:,jl,2)
2768         END DO
2769      CASE ( 1 )
2770         DO jl = 1, jprecj
2771            ptab(:,jl) = zt2sn(:,jl,2)
2772         END DO
2773      END SELECT
2774
2775
2776      ! 4. north fold treatment
2777      ! -----------------------
2778      !
2779      IF( npolj /= 0) THEN
2780         !
2781         SELECT CASE ( jpni )
2782         CASE ( 1 )     ;   CALL lbc_nfd      ( ptab, cd_type, psgn )   ! only 1 northern proc, no mpp
2783         CASE DEFAULT   ;   CALL mpp_lbc_north( ptab, cd_type, psgn )   ! for all northern procs.
2784         END SELECT
2785         !
2786      ENDIF
2787      !
2788      DEALLOCATE( zt2ns, zt2sn, zt2ew, zt2we  )
2789      !
2790   END SUBROUTINE mpp_lnk_bdy_2d
2791
2792   SUBROUTINE mpi_init_opa( ldtxt, ksft, code )
2793      !!---------------------------------------------------------------------
2794      !!                   ***  routine mpp_init.opa  ***
2795      !!
2796      !! ** Purpose :: export and attach a MPI buffer for bsend
2797      !!
2798      !! ** Method  :: define buffer size in namelist, if 0 no buffer attachment
2799      !!            but classical mpi_init
2800      !!
2801      !! History :: 01/11 :: IDRIS initial version for IBM only
2802      !!            08/04 :: R. Benshila, generalisation
2803      !!---------------------------------------------------------------------
2804      CHARACTER(len=*),DIMENSION(:), INTENT(  out) ::   ldtxt
2805      INTEGER                      , INTENT(inout) ::   ksft
2806      INTEGER                      , INTENT(  out) ::   code
2807      INTEGER                                      ::   ierr, ji
2808      LOGICAL                                      ::   mpi_was_called
2809      !!---------------------------------------------------------------------
2810      !
2811      CALL mpi_initialized( mpi_was_called, code )      ! MPI initialization
2812      IF ( code /= MPI_SUCCESS ) THEN
2813         DO ji = 1, SIZE(ldtxt)
2814            IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
2815         END DO
2816         WRITE(*, cform_err)
2817         WRITE(*, *) ' lib_mpp: Error in routine mpi_initialized'
2818         CALL mpi_abort( mpi_comm_world, code, ierr )
2819      ENDIF
2820      !
2821      IF( .NOT. mpi_was_called ) THEN
2822         CALL mpi_init( code )
2823         CALL mpi_comm_dup( mpi_comm_world, mpi_comm_opa, code )
2824         IF ( code /= MPI_SUCCESS ) THEN
2825            DO ji = 1, SIZE(ldtxt)
2826               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
2827            END DO
2828            WRITE(*, cform_err)
2829            WRITE(*, *) ' lib_mpp: Error in routine mpi_comm_dup'
2830            CALL mpi_abort( mpi_comm_world, code, ierr )
2831         ENDIF
2832      ENDIF
2833      !
2834      IF( nn_buffer > 0 ) THEN
2835         WRITE(ldtxt(ksft),*) 'mpi_bsend, buffer allocation of  : ', nn_buffer   ;   ksft = ksft + 1
2836         ! Buffer allocation and attachment
2837         ALLOCATE( tampon(nn_buffer), stat = ierr )
2838         IF( ierr /= 0 ) THEN
2839            DO ji = 1, SIZE(ldtxt)
2840               IF( TRIM(ldtxt(ji)) /= '' )   WRITE(*,*) ldtxt(ji)      ! control print of mynode
2841            END DO
2842            WRITE(*, cform_err)
2843            WRITE(*, *) ' lib_mpp: Error in ALLOCATE', ierr
2844            CALL mpi_abort( mpi_comm_world, code, ierr )
2845         END IF
2846         CALL mpi_buffer_attach( tampon, nn_buffer, code )
2847      ENDIF
2848      !
2849   END SUBROUTINE mpi_init_opa
2850
2851   SUBROUTINE DDPDD_MPI (ydda, yddb, ilen, itype)
2852      !!---------------------------------------------------------------------
2853      !!   Routine DDPDD_MPI: used by reduction operator MPI_SUMDD
2854      !!
2855      !!   Modification of original codes written by David H. Bailey
2856      !!   This subroutine computes yddb(i) = ydda(i)+yddb(i)
2857      !!---------------------------------------------------------------------
2858      INTEGER, INTENT(in)                         :: ilen, itype
2859      COMPLEX(wp), DIMENSION(ilen), INTENT(in)     :: ydda
2860      COMPLEX(wp), DIMENSION(ilen), INTENT(inout)  :: yddb
2861      !
2862      REAL(wp) :: zerr, zt1, zt2    ! local work variables
2863      INTEGER :: ji, ztmp           ! local scalar
2864
2865      ztmp = itype   ! avoid compilation warning
2866
2867      DO ji=1,ilen
2868      ! Compute ydda + yddb using Knuth's trick.
2869         zt1  = real(ydda(ji)) + real(yddb(ji))
2870         zerr = zt1 - real(ydda(ji))
2871         zt2  = ((real(yddb(ji)) - zerr) + (real(ydda(ji)) - (zt1 - zerr))) &
2872                + aimag(ydda(ji)) + aimag(yddb(ji))
2873
2874         ! The result is zt1 + zt2, after normalization.
2875         yddb(ji) = cmplx ( zt1 + zt2, zt2 - ((zt1 + zt2) - zt1),wp )
2876      END DO
2877
2878   END SUBROUTINE DDPDD_MPI
2879
2880#else
2881   !!----------------------------------------------------------------------
2882   !!   Default case:            Dummy module        share memory computing
2883   !!----------------------------------------------------------------------
2884   USE in_out_manager
2885
2886   INTERFACE mpp_sum
2887      MODULE PROCEDURE mpp_sum_a2s, mpp_sum_as, mpp_sum_ai, mpp_sum_s, mpp_sum_i, mppsum_realdd, mppsum_a_realdd
2888   END INTERFACE
2889   INTERFACE mpp_max
2890      MODULE PROCEDURE mppmax_a_int, mppmax_int, mppmax_a_real, mppmax_real
2891   END INTERFACE
2892   INTERFACE mpp_min
2893      MODULE PROCEDURE mppmin_a_int, mppmin_int, mppmin_a_real, mppmin_real
2894   END INTERFACE
2895   INTERFACE mpp_minloc
2896      MODULE PROCEDURE mpp_minloc2d ,mpp_minloc3d
2897   END INTERFACE
2898   INTERFACE mpp_maxloc
2899      MODULE PROCEDURE mpp_maxloc2d ,mpp_maxloc3d
2900   END INTERFACE
2901
2902   LOGICAL, PUBLIC, PARAMETER ::   lk_mpp = .FALSE.      !: mpp flag
2903   LOGICAL, PUBLIC            ::   ln_nnogather          !: namelist control of northfold comms (needed here in case "key_mpp_mpi" is not used)
2904   INTEGER :: ncomm_ice
2905   !!----------------------------------------------------------------------
2906CONTAINS
2907
2908   INTEGER FUNCTION lib_mpp_alloc(kumout)          ! Dummy function
2909      INTEGER, INTENT(in) ::   kumout
2910      lib_mpp_alloc = 0
2911   END FUNCTION lib_mpp_alloc
2912
2913   FUNCTION mynode( ldtxt, kumnam_ref, knumnam_cfg,  kumond , kstop, localComm ) RESULT (function_value)
2914      INTEGER, OPTIONAL            , INTENT(in   ) ::   localComm
2915      CHARACTER(len=*),DIMENSION(:) ::   ldtxt
2916      INTEGER ::   kumnam_ref, knumnam_cfg , kumond , kstop
2917      IF( PRESENT( localComm ) .OR. .NOT.PRESENT( localComm ) )   function_value = 0
2918      IF( .FALSE. )   ldtxt(:) = 'never done'
2919      CALL ctl_opn( kumond, 'output.namelist.dyn', 'UNKNOWN', 'FORMATTED', 'SEQUENTIAL', -1, 6, .FALSE. , 1 )
2920   END FUNCTION mynode
2921
2922   SUBROUTINE mppsync                       ! Dummy routine
2923   END SUBROUTINE mppsync
2924
2925   SUBROUTINE mpp_sum_as( parr, kdim, kcom )      ! Dummy routine
2926      REAL   , DIMENSION(:) :: parr
2927      INTEGER               :: kdim
2928      INTEGER, OPTIONAL     :: kcom
2929      WRITE(*,*) 'mpp_sum_as: You should not have seen this print! error?', kdim, parr(1), kcom
2930   END SUBROUTINE mpp_sum_as
2931
2932   SUBROUTINE mpp_sum_a2s( parr, kdim, kcom )      ! Dummy routine
2933      REAL   , DIMENSION(:,:) :: parr
2934      INTEGER               :: kdim
2935      INTEGER, OPTIONAL     :: kcom
2936      WRITE(*,*) 'mpp_sum_a2s: You should not have seen this print! error?', kdim, parr(1,1), kcom
2937   END SUBROUTINE mpp_sum_a2s
2938
2939   SUBROUTINE mpp_sum_ai( karr, kdim, kcom )      ! Dummy routine
2940      INTEGER, DIMENSION(:) :: karr
2941      INTEGER               :: kdim
2942      INTEGER, OPTIONAL     :: kcom
2943      WRITE(*,*) 'mpp_sum_ai: You should not have seen this print! error?', kdim, karr(1), kcom
2944   END SUBROUTINE mpp_sum_ai
2945
2946   SUBROUTINE mpp_sum_s( psca, kcom )            ! Dummy routine
2947      REAL                  :: psca
2948      INTEGER, OPTIONAL     :: kcom
2949      WRITE(*,*) 'mpp_sum_s: You should not have seen this print! error?', psca, kcom
2950   END SUBROUTINE mpp_sum_s
2951
2952   SUBROUTINE mpp_sum_i( kint, kcom )            ! Dummy routine
2953      integer               :: kint
2954      INTEGER, OPTIONAL     :: kcom
2955      WRITE(*,*) 'mpp_sum_i: You should not have seen this print! error?', kint, kcom
2956   END SUBROUTINE mpp_sum_i
2957
2958   SUBROUTINE mppsum_realdd( ytab, kcom )
2959      COMPLEX(wp), INTENT(inout)         :: ytab    ! input scalar
2960      INTEGER , INTENT( in  ), OPTIONAL :: kcom
2961      WRITE(*,*) 'mppsum_realdd: You should not have seen this print! error?', ytab
2962   END SUBROUTINE mppsum_realdd
2963
2964   SUBROUTINE mppsum_a_realdd( ytab, kdim, kcom )
2965      INTEGER , INTENT( in )                        ::   kdim      ! size of ytab
2966      COMPLEX(wp), DIMENSION(kdim), INTENT( inout ) ::   ytab      ! input array
2967      INTEGER , INTENT( in  ), OPTIONAL :: kcom
2968      WRITE(*,*) 'mppsum_a_realdd: You should not have seen this print! error?', kdim, ytab(1), kcom
2969   END SUBROUTINE mppsum_a_realdd
2970
2971   SUBROUTINE mppmax_a_real( parr, kdim, kcom )
2972      REAL   , DIMENSION(:) :: parr
2973      INTEGER               :: kdim
2974      INTEGER, OPTIONAL     :: kcom
2975      WRITE(*,*) 'mppmax_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
2976   END SUBROUTINE mppmax_a_real
2977
2978   SUBROUTINE mppmax_real( psca, kcom )
2979      REAL                  :: psca
2980      INTEGER, OPTIONAL     :: kcom
2981      WRITE(*,*) 'mppmax_real: You should not have seen this print! error?', psca, kcom
2982   END SUBROUTINE mppmax_real
2983
2984   SUBROUTINE mppmin_a_real( parr, kdim, kcom )
2985      REAL   , DIMENSION(:) :: parr
2986      INTEGER               :: kdim
2987      INTEGER, OPTIONAL     :: kcom
2988      WRITE(*,*) 'mppmin_a_real: You should not have seen this print! error?', kdim, parr(1), kcom
2989   END SUBROUTINE mppmin_a_real
2990
2991   SUBROUTINE mppmin_real( psca, kcom )
2992      REAL                  :: psca
2993      INTEGER, OPTIONAL     :: kcom
2994      WRITE(*,*) 'mppmin_real: You should not have seen this print! error?', psca, kcom
2995   END SUBROUTINE mppmin_real
2996
2997   SUBROUTINE mppmax_a_int( karr, kdim ,kcom)
2998      INTEGER, DIMENSION(:) :: karr
2999      INTEGER               :: kdim
3000      INTEGER, OPTIONAL     :: kcom
3001      WRITE(*,*) 'mppmax_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
3002   END SUBROUTINE mppmax_a_int
3003
3004   SUBROUTINE mppmax_int( kint, kcom)
3005      INTEGER               :: kint
3006      INTEGER, OPTIONAL     :: kcom
3007      WRITE(*,*) 'mppmax_int: You should not have seen this print! error?', kint, kcom
3008   END SUBROUTINE mppmax_int
3009
3010   SUBROUTINE mppmin_a_int( karr, kdim, kcom )
3011      INTEGER, DIMENSION(:) :: karr
3012      INTEGER               :: kdim
3013      INTEGER, OPTIONAL     :: kcom
3014      WRITE(*,*) 'mppmin_a_int: You should not have seen this print! error?', kdim, karr(1), kcom
3015   END SUBROUTINE mppmin_a_int
3016
3017   SUBROUTINE mppmin_int( kint, kcom )
3018      INTEGER               :: kint
3019      INTEGER, OPTIONAL     :: kcom
3020      WRITE(*,*) 'mppmin_int: You should not have seen this print! error?', kint, kcom
3021   END SUBROUTINE mppmin_int
3022
3023   SUBROUTINE mpp_minloc2d( ptab, pmask, pmin, ki, kj )
3024      REAL                   :: pmin
3025      REAL , DIMENSION (:,:) :: ptab, pmask
3026      INTEGER :: ki, kj
3027      WRITE(*,*) 'mpp_minloc2d: You should not have seen this print! error?', pmin, ki, kj, ptab(1,1), pmask(1,1)
3028   END SUBROUTINE mpp_minloc2d
3029
3030   SUBROUTINE mpp_minloc3d( ptab, pmask, pmin, ki, kj, kk )
3031      REAL                     :: pmin
3032      REAL , DIMENSION (:,:,:) :: ptab, pmask
3033      INTEGER :: ki, kj, kk
3034      WRITE(*,*) 'mpp_minloc3d: You should not have seen this print! error?', pmin, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
3035   END SUBROUTINE mpp_minloc3d
3036
3037   SUBROUTINE mpp_maxloc2d( ptab, pmask, pmax, ki, kj )
3038      REAL                   :: pmax
3039      REAL , DIMENSION (:,:) :: ptab, pmask
3040      INTEGER :: ki, kj
3041      WRITE(*,*) 'mpp_maxloc2d: You should not have seen this print! error?', pmax, ki, kj, ptab(1,1), pmask(1,1)
3042   END SUBROUTINE mpp_maxloc2d
3043
3044   SUBROUTINE mpp_maxloc3d( ptab, pmask, pmax, ki, kj, kk )
3045      REAL                     :: pmax
3046      REAL , DIMENSION (:,:,:) :: ptab, pmask
3047      INTEGER :: ki, kj, kk
3048      WRITE(*,*) 'mpp_maxloc3d: You should not have seen this print! error?', pmax, ki, kj, kk, ptab(1,1,1), pmask(1,1,1)
3049   END SUBROUTINE mpp_maxloc3d
3050
3051   SUBROUTINE mppstop
3052      STOP      ! non MPP case, just stop the run
3053   END SUBROUTINE mppstop
3054
3055   SUBROUTINE mpp_ini_ice( kcom, knum )
3056      INTEGER :: kcom, knum
3057      WRITE(*,*) 'mpp_ini_ice: You should not have seen this print! error?', kcom, knum
3058   END SUBROUTINE mpp_ini_ice
3059
3060   SUBROUTINE mpp_ini_znl( knum )
3061      INTEGER :: knum
3062      WRITE(*,*) 'mpp_ini_znl: You should not have seen this print! error?', knum
3063   END SUBROUTINE mpp_ini_znl
3064
3065   SUBROUTINE mpp_comm_free( kcom )
3066      INTEGER :: kcom
3067      WRITE(*,*) 'mpp_comm_free: You should not have seen this print! error?', kcom
3068   END SUBROUTINE mpp_comm_free
3069#endif
3070
3071   !!----------------------------------------------------------------------
3072   !!   All cases:         ctl_stop, ctl_warn, get_unit, ctl_opn, ctl_nam   routines
3073   !!----------------------------------------------------------------------
3074
3075   SUBROUTINE ctl_stop( cd1, cd2, cd3, cd4, cd5 ,   &
3076      &                 cd6, cd7, cd8, cd9, cd10 )
3077      !!----------------------------------------------------------------------
3078      !!                  ***  ROUTINE  stop_opa  ***
3079      !!
3080      !! ** Purpose :   print in ocean.outpput file a error message and
3081      !!                increment the error number (nstop) by one.
3082      !!----------------------------------------------------------------------
3083      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
3084      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
3085      !!----------------------------------------------------------------------
3086      !
3087      nstop = nstop + 1
3088      IF(lwp) THEN
3089         WRITE(numout,cform_err)
3090         IF( PRESENT(cd1 ) )   WRITE(numout,*) cd1
3091         IF( PRESENT(cd2 ) )   WRITE(numout,*) cd2
3092         IF( PRESENT(cd3 ) )   WRITE(numout,*) cd3
3093         IF( PRESENT(cd4 ) )   WRITE(numout,*) cd4
3094         IF( PRESENT(cd5 ) )   WRITE(numout,*) cd5
3095         IF( PRESENT(cd6 ) )   WRITE(numout,*) cd6
3096         IF( PRESENT(cd7 ) )   WRITE(numout,*) cd7
3097         IF( PRESENT(cd8 ) )   WRITE(numout,*) cd8
3098         IF( PRESENT(cd9 ) )   WRITE(numout,*) cd9
3099         IF( PRESENT(cd10) )   WRITE(numout,*) cd10
3100      ENDIF
3101                               CALL FLUSH(numout    )
3102      IF( numstp     /= -1 )   CALL FLUSH(numstp    )
3103      IF( numsol     /= -1 )   CALL FLUSH(numsol    )
3104      IF( numevo_ice /= -1 )   CALL FLUSH(numevo_ice)
3105      !
3106      IF( cd1 == 'STOP' ) THEN
3107         IF(lwp) WRITE(numout,*)  'huge E-R-R-O-R : immediate stop'
3108         CALL mppstop()
3109      ENDIF
3110      !
3111   END SUBROUTINE ctl_stop
3112
3113
3114   SUBROUTINE ctl_warn( cd1, cd2, cd3, cd4, cd5,   &
3115      &                 cd6, cd7, cd8, cd9, cd10 )
3116      !!----------------------------------------------------------------------
3117      !!                  ***  ROUTINE  stop_warn  ***
3118      !!
3119      !! ** Purpose :   print in ocean.outpput file a error message and
3120      !!                increment the warning number (nwarn) by one.
3121      !!----------------------------------------------------------------------
3122      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd1, cd2, cd3, cd4, cd5
3123      CHARACTER(len=*), INTENT(in), OPTIONAL ::  cd6, cd7, cd8, cd9, cd10
3124      !!----------------------------------------------------------------------
3125      !
3126      nwarn = nwarn + 1
3127      IF(lwp) THEN
3128         WRITE(numout,cform_war)
3129         IF( PRESENT(cd1 ) ) WRITE(numout,*) cd1
3130         IF( PRESENT(cd2 ) ) WRITE(numout,*) cd2
3131         IF( PRESENT(cd3 ) ) WRITE(numout,*) cd3
3132         IF( PRESENT(cd4 ) ) WRITE(numout,*) cd4
3133         IF( PRESENT(cd5 ) ) WRITE(numout,*) cd5
3134         IF( PRESENT(cd6 ) ) WRITE(numout,*) cd6
3135         IF( PRESENT(cd7 ) ) WRITE(numout,*) cd7
3136         IF( PRESENT(cd8 ) ) WRITE(numout,*) cd8
3137         IF( PRESENT(cd9 ) ) WRITE(numout,*) cd9
3138         IF( PRESENT(cd10) ) WRITE(numout,*) cd10
3139      ENDIF
3140      CALL FLUSH(numout)
3141      !
3142   END SUBROUTINE ctl_warn
3143
3144
3145   SUBROUTINE ctl_opn( knum, cdfile, cdstat, cdform, cdacce, klengh, kout, ldwp, karea )
3146      !!----------------------------------------------------------------------
3147      !!                  ***  ROUTINE ctl_opn  ***
3148      !!
3149      !! ** Purpose :   Open file and check if required file is available.
3150      !!
3151      !! ** Method  :   Fortan open
3152      !!----------------------------------------------------------------------
3153      INTEGER          , INTENT(  out) ::   knum      ! logical unit to open
3154      CHARACTER(len=*) , INTENT(in   ) ::   cdfile    ! file name to open
3155      CHARACTER(len=*) , INTENT(in   ) ::   cdstat    ! disposition specifier
3156      CHARACTER(len=*) , INTENT(in   ) ::   cdform    ! formatting specifier
3157      CHARACTER(len=*) , INTENT(in   ) ::   cdacce    ! access specifier
3158      INTEGER          , INTENT(in   ) ::   klengh    ! record length
3159      INTEGER          , INTENT(in   ) ::   kout      ! number of logical units for write
3160      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
3161      INTEGER, OPTIONAL, INTENT(in   ) ::   karea     ! proc number
3162      !!
3163      CHARACTER(len=80) ::   clfile
3164      INTEGER           ::   iost
3165      !!----------------------------------------------------------------------
3166
3167      ! adapt filename
3168      ! ----------------
3169      clfile = TRIM(cdfile)
3170      IF( PRESENT( karea ) ) THEN
3171         IF( karea > 1 )   WRITE(clfile, "(a,'_',i4.4)") TRIM(clfile), karea-1
3172      ENDIF
3173#if defined key_agrif
3174      IF( .NOT. Agrif_Root() )   clfile = TRIM(Agrif_CFixed())//'_'//TRIM(clfile)
3175      knum=Agrif_Get_Unit()
3176#else
3177      knum=get_unit()
3178#endif
3179
3180      iost=0
3181      IF( cdacce(1:6) == 'DIRECT' )  THEN
3182         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat, RECL=klengh, ERR=100, IOSTAT=iost )
3183      ELSE
3184         OPEN( UNIT=knum, FILE=clfile, FORM=cdform, ACCESS=cdacce, STATUS=cdstat             , ERR=100, IOSTAT=iost )
3185      ENDIF
3186      IF( iost == 0 ) THEN
3187         IF(ldwp) THEN
3188            WRITE(kout,*) '     file   : ', clfile,' open ok'
3189            WRITE(kout,*) '     unit   = ', knum
3190            WRITE(kout,*) '     status = ', cdstat
3191            WRITE(kout,*) '     form   = ', cdform
3192            WRITE(kout,*) '     access = ', cdacce
3193            WRITE(kout,*)
3194         ENDIF
3195      ENDIF
3196100   CONTINUE
3197      IF( iost /= 0 ) THEN
3198         IF(ldwp) THEN
3199            WRITE(kout,*)
3200            WRITE(kout,*) ' ===>>>> : bad opening file: ', clfile
3201            WRITE(kout,*) ' =======   ===  '
3202            WRITE(kout,*) '           unit   = ', knum
3203            WRITE(kout,*) '           status = ', cdstat
3204            WRITE(kout,*) '           form   = ', cdform
3205            WRITE(kout,*) '           access = ', cdacce
3206            WRITE(kout,*) '           iostat = ', iost
3207            WRITE(kout,*) '           we stop. verify the file '
3208            WRITE(kout,*)
3209         ENDIF
3210         STOP 'ctl_opn bad opening'
3211      ENDIF
3212
3213   END SUBROUTINE ctl_opn
3214
3215   SUBROUTINE ctl_nam ( kios, cdnam, ldwp )
3216      !!----------------------------------------------------------------------
3217      !!                  ***  ROUTINE ctl_nam  ***
3218      !!
3219      !! ** Purpose :   Informations when error while reading a namelist
3220      !!
3221      !! ** Method  :   Fortan open
3222      !!----------------------------------------------------------------------
3223      INTEGER          , INTENT(inout) ::   kios      ! IO status after reading the namelist
3224      CHARACTER(len=*) , INTENT(in   ) ::   cdnam     ! group name of namelist for which error occurs
3225      CHARACTER(len=4)                 ::   clios     ! string to convert iostat in character for print
3226      LOGICAL          , INTENT(in   ) ::   ldwp      ! boolean term for print
3227      !!----------------------------------------------------------------------
3228
3229      !
3230      ! ----------------
3231      WRITE (clios, '(I4.0)') kios
3232      IF( kios < 0 ) THEN         
3233         CALL ctl_warn( 'W A R N I N G:  end of record or file while reading namelist ' &
3234 &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
3235      ENDIF
3236
3237      IF( kios > 0 ) THEN
3238         CALL ctl_stop( 'E R R O R :   misspelled variable in namelist ' &
3239 &           // TRIM(cdnam) // ' iostat = ' // TRIM(clios) )
3240      ENDIF
3241      kios = 0
3242      RETURN
3243     
3244   END SUBROUTINE ctl_nam
3245
3246   INTEGER FUNCTION get_unit()
3247      !!----------------------------------------------------------------------
3248      !!                  ***  FUNCTION  get_unit  ***
3249      !!
3250      !! ** Purpose :   return the index of an unused logical unit
3251      !!----------------------------------------------------------------------
3252      LOGICAL :: llopn
3253      !!----------------------------------------------------------------------
3254      !
3255      get_unit = 15   ! choose a unit that is big enough then it is not already used in NEMO
3256      llopn = .TRUE.
3257      DO WHILE( (get_unit < 998) .AND. llopn )
3258         get_unit = get_unit + 1
3259         INQUIRE( unit = get_unit, opened = llopn )
3260      END DO
3261      IF( (get_unit == 999) .AND. llopn ) THEN
3262         CALL ctl_stop( 'get_unit: All logical units until 999 are used...' )
3263         get_unit = -1
3264      ENDIF
3265      !
3266   END FUNCTION get_unit
3267
3268   !!----------------------------------------------------------------------
3269END MODULE lib_mpp
Note: See TracBrowser for help on using the repository browser.