Root/fs/ocfs2/dlmglue.c

1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/kthread.h>
31#include <linux/pagemap.h>
32#include <linux/debugfs.h>
33#include <linux/seq_file.h>
34#include <linux/time.h>
35#include <linux/quotaops.h>
36
37#define MLOG_MASK_PREFIX ML_DLM_GLUE
38#include <cluster/masklog.h>
39
40#include "ocfs2.h"
41#include "ocfs2_lockingver.h"
42
43#include "alloc.h"
44#include "dcache.h"
45#include "dlmglue.h"
46#include "extent_map.h"
47#include "file.h"
48#include "heartbeat.h"
49#include "inode.h"
50#include "journal.h"
51#include "stackglue.h"
52#include "slot_map.h"
53#include "super.h"
54#include "uptodate.h"
55#include "quota.h"
56#include "refcounttree.h"
57
58#include "buffer_head_io.h"
59
60struct ocfs2_mask_waiter {
61    struct list_head mw_item;
62    int mw_status;
63    struct completion mw_complete;
64    unsigned long mw_mask;
65    unsigned long mw_goal;
66#ifdef CONFIG_OCFS2_FS_STATS
67    ktime_t mw_lock_start;
68#endif
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
74static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
75
76/*
77 * Return value from ->downconvert_worker functions.
78 *
79 * These control the precise actions of ocfs2_unblock_lock()
80 * and ocfs2_process_blocked_lock()
81 *
82 */
83enum ocfs2_unblock_action {
84    UNBLOCK_CONTINUE = 0, /* Continue downconvert */
85    UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire
86                      * ->post_unlock callback */
87    UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire
88                      * ->post_unlock() callback. */
89};
90
91struct ocfs2_unblock_ctl {
92    int requeue;
93    enum ocfs2_unblock_action unblock_action;
94};
95
96/* Lockdep class keys */
97struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
98
99static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
100                    int new_level);
101static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
102
103static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
104                     int blocking);
105
106static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
107                       int blocking);
108
109static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
110                     struct ocfs2_lock_res *lockres);
111
112static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
113
114static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
115                        int new_level);
116static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
117                     int blocking);
118
119#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
120
121/* This aids in debugging situations where a bad LVB might be involved. */
122static void ocfs2_dump_meta_lvb_info(u64 level,
123                     const char *function,
124                     unsigned int line,
125                     struct ocfs2_lock_res *lockres)
126{
127    struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
128
129    mlog(level, "LVB information for %s (called from %s:%u):\n",
130         lockres->l_name, function, line);
131    mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
132         lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
133         be32_to_cpu(lvb->lvb_igeneration));
134    mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
135         (unsigned long long)be64_to_cpu(lvb->lvb_isize),
136         be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
137         be16_to_cpu(lvb->lvb_imode));
138    mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
139         "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
140         (long long)be64_to_cpu(lvb->lvb_iatime_packed),
141         (long long)be64_to_cpu(lvb->lvb_ictime_packed),
142         (long long)be64_to_cpu(lvb->lvb_imtime_packed),
143         be32_to_cpu(lvb->lvb_iattr));
144}
145
146
147/*
148 * OCFS2 Lock Resource Operations
149 *
150 * These fine tune the behavior of the generic dlmglue locking infrastructure.
151 *
152 * The most basic of lock types can point ->l_priv to their respective
153 * struct ocfs2_super and allow the default actions to manage things.
154 *
155 * Right now, each lock type also needs to implement an init function,
156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
157 * should be called when the lock is no longer needed (i.e., object
158 * destruction time).
159 */
160struct ocfs2_lock_res_ops {
161    /*
162     * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
163     * this callback if ->l_priv is not an ocfs2_super pointer
164     */
165    struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
166
167    /*
168     * Optionally called in the downconvert thread after a
169     * successful downconvert. The lockres will not be referenced
170     * after this callback is called, so it is safe to free
171     * memory, etc.
172     *
173     * The exact semantics of when this is called are controlled
174     * by ->downconvert_worker()
175     */
176    void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
177
178    /*
179     * Allow a lock type to add checks to determine whether it is
180     * safe to downconvert a lock. Return 0 to re-queue the
181     * downconvert at a later time, nonzero to continue.
182     *
183     * For most locks, the default checks that there are no
184     * incompatible holders are sufficient.
185     *
186     * Called with the lockres spinlock held.
187     */
188    int (*check_downconvert)(struct ocfs2_lock_res *, int);
189
190    /*
191     * Allows a lock type to populate the lock value block. This
192     * is called on downconvert, and when we drop a lock.
193     *
194     * Locks that want to use this should set LOCK_TYPE_USES_LVB
195     * in the flags field.
196     *
197     * Called with the lockres spinlock held.
198     */
199    void (*set_lvb)(struct ocfs2_lock_res *);
200
201    /*
202     * Called from the downconvert thread when it is determined
203     * that a lock will be downconverted. This is called without
204     * any locks held so the function can do work that might
205     * schedule (syncing out data, etc).
206     *
207     * This should return any one of the ocfs2_unblock_action
208     * values, depending on what it wants the thread to do.
209     */
210    int (*downconvert_worker)(struct ocfs2_lock_res *, int);
211
212    /*
213     * LOCK_TYPE_* flags which describe the specific requirements
214     * of a lock type. Descriptions of each individual flag follow.
215     */
216    int flags;
217};
218
219/*
220 * Some locks want to "refresh" potentially stale data when a
221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
223 * individual lockres l_flags member from the ast function. It is
224 * expected that the locking wrapper will clear the
225 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
226 */
227#define LOCK_TYPE_REQUIRES_REFRESH 0x1
228
229/*
230 * Indicate that a lock type makes use of the lock value block. The
231 * ->set_lvb lock type callback must be defined.
232 */
233#define LOCK_TYPE_USES_LVB 0x2
234
235static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
236    .get_osb = ocfs2_get_inode_osb,
237    .flags = 0,
238};
239
240static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
241    .get_osb = ocfs2_get_inode_osb,
242    .check_downconvert = ocfs2_check_meta_downconvert,
243    .set_lvb = ocfs2_set_meta_lvb,
244    .downconvert_worker = ocfs2_data_convert_worker,
245    .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
246};
247
248static struct ocfs2_lock_res_ops ocfs2_super_lops = {
249    .flags = LOCK_TYPE_REQUIRES_REFRESH,
250};
251
252static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
253    .flags = 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
257    .flags = 0,
258};
259
260static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
261    .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
262};
263
264static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
265    .get_osb = ocfs2_get_dentry_osb,
266    .post_unlock = ocfs2_dentry_post_unlock,
267    .downconvert_worker = ocfs2_dentry_convert_worker,
268    .flags = 0,
269};
270
271static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
272    .get_osb = ocfs2_get_inode_osb,
273    .flags = 0,
274};
275
276static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
277    .get_osb = ocfs2_get_file_osb,
278    .flags = 0,
279};
280
281static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
282    .set_lvb = ocfs2_set_qinfo_lvb,
283    .get_osb = ocfs2_get_qinfo_osb,
284    .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
285};
286
287static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
288    .check_downconvert = ocfs2_check_refcount_downconvert,
289    .downconvert_worker = ocfs2_refcount_convert_worker,
290    .flags = 0,
291};
292
293static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
294{
295    return lockres->l_type == OCFS2_LOCK_TYPE_META ||
296        lockres->l_type == OCFS2_LOCK_TYPE_RW ||
297        lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
298}
299
300static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
301{
302    return container_of(lksb, struct ocfs2_lock_res, l_lksb);
303}
304
305static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
306{
307    BUG_ON(!ocfs2_is_inode_lock(lockres));
308
309    return (struct inode *) lockres->l_priv;
310}
311
312static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
313{
314    BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
315
316    return (struct ocfs2_dentry_lock *)lockres->l_priv;
317}
318
319static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
320{
321    BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
322
323    return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
324}
325
326static inline struct ocfs2_refcount_tree *
327ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
328{
329    return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
330}
331
332static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
333{
334    if (lockres->l_ops->get_osb)
335        return lockres->l_ops->get_osb(lockres);
336
337    return (struct ocfs2_super *)lockres->l_priv;
338}
339
340static int ocfs2_lock_create(struct ocfs2_super *osb,
341                 struct ocfs2_lock_res *lockres,
342                 int level,
343                 u32 dlm_flags);
344static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
345                             int wanted);
346static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
347                   struct ocfs2_lock_res *lockres,
348                   int level, unsigned long caller_ip);
349static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
350                    struct ocfs2_lock_res *lockres,
351                    int level)
352{
353    __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
354}
355
356static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
357static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
358static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
359static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
360static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
361                    struct ocfs2_lock_res *lockres);
362static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
363                        int convert);
364#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \
365    if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \
366        mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
367             _err, _func, _lockres->l_name); \
368    else \
369        mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \
370             _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \
371             (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \
372} while (0)
373static int ocfs2_downconvert_thread(void *arg);
374static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
375                    struct ocfs2_lock_res *lockres);
376static int ocfs2_inode_lock_update(struct inode *inode,
377                  struct buffer_head **bh);
378static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
379static inline int ocfs2_highest_compat_lock_level(int level);
380static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
381                          int new_level);
382static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
383                  struct ocfs2_lock_res *lockres,
384                  int new_level,
385                  int lvb,
386                  unsigned int generation);
387static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
388                        struct ocfs2_lock_res *lockres);
389static int ocfs2_cancel_convert(struct ocfs2_super *osb,
390                struct ocfs2_lock_res *lockres);
391
392
393static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
394                  u64 blkno,
395                  u32 generation,
396                  char *name)
397{
398    int len;
399
400    BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
401
402    len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
403               ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
404               (long long)blkno, generation);
405
406    BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
407
408    mlog(0, "built lock resource with name: %s\n", name);
409}
410
411static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
412
413static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
414                       struct ocfs2_dlm_debug *dlm_debug)
415{
416    mlog(0, "Add tracking for lockres %s\n", res->l_name);
417
418    spin_lock(&ocfs2_dlm_tracking_lock);
419    list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
420    spin_unlock(&ocfs2_dlm_tracking_lock);
421}
422
423static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
424{
425    spin_lock(&ocfs2_dlm_tracking_lock);
426    if (!list_empty(&res->l_debug_list))
427        list_del_init(&res->l_debug_list);
428    spin_unlock(&ocfs2_dlm_tracking_lock);
429}
430
431#ifdef CONFIG_OCFS2_FS_STATS
432static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
433{
434    res->l_lock_refresh = 0;
435    memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
436    memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
437}
438
439static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
440                    struct ocfs2_mask_waiter *mw, int ret)
441{
442    u32 usec;
443    ktime_t kt;
444    struct ocfs2_lock_stats *stats;
445
446    if (level == LKM_PRMODE)
447        stats = &res->l_lock_prmode;
448    else if (level == LKM_EXMODE)
449        stats = &res->l_lock_exmode;
450    else
451        return;
452
453    kt = ktime_sub(ktime_get(), mw->mw_lock_start);
454    usec = ktime_to_us(kt);
455
456    stats->ls_gets++;
457    stats->ls_total += ktime_to_ns(kt);
458    /* overflow */
459    if (unlikely(stats->ls_gets) == 0) {
460        stats->ls_gets++;
461        stats->ls_total = ktime_to_ns(kt);
462    }
463
464    if (stats->ls_max < usec)
465        stats->ls_max = usec;
466
467    if (ret)
468        stats->ls_fail++;
469}
470
471static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
472{
473    lockres->l_lock_refresh++;
474}
475
476static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
477{
478    mw->mw_lock_start = ktime_get();
479}
480#else
481static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
482{
483}
484static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
485               int level, struct ocfs2_mask_waiter *mw, int ret)
486{
487}
488static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
489{
490}
491static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
492{
493}
494#endif
495
496static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
497                       struct ocfs2_lock_res *res,
498                       enum ocfs2_lock_type type,
499                       struct ocfs2_lock_res_ops *ops,
500                       void *priv)
501{
502    res->l_type = type;
503    res->l_ops = ops;
504    res->l_priv = priv;
505
506    res->l_level = DLM_LOCK_IV;
507    res->l_requested = DLM_LOCK_IV;
508    res->l_blocking = DLM_LOCK_IV;
509    res->l_action = OCFS2_AST_INVALID;
510    res->l_unlock_action = OCFS2_UNLOCK_INVALID;
511
512    res->l_flags = OCFS2_LOCK_INITIALIZED;
513
514    ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
515
516    ocfs2_init_lock_stats(res);
517#ifdef CONFIG_DEBUG_LOCK_ALLOC
518    if (type != OCFS2_LOCK_TYPE_OPEN)
519        lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
520                 &lockdep_keys[type], 0);
521    else
522        res->l_lockdep_map.key = NULL;
523#endif
524}
525
526void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
527{
528    /* This also clears out the lock status block */
529    memset(res, 0, sizeof(struct ocfs2_lock_res));
530    spin_lock_init(&res->l_lock);
531    init_waitqueue_head(&res->l_event);
532    INIT_LIST_HEAD(&res->l_blocked_list);
533    INIT_LIST_HEAD(&res->l_mask_waiters);
534}
535
536void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
537                   enum ocfs2_lock_type type,
538                   unsigned int generation,
539                   struct inode *inode)
540{
541    struct ocfs2_lock_res_ops *ops;
542
543    switch(type) {
544        case OCFS2_LOCK_TYPE_RW:
545            ops = &ocfs2_inode_rw_lops;
546            break;
547        case OCFS2_LOCK_TYPE_META:
548            ops = &ocfs2_inode_inode_lops;
549            break;
550        case OCFS2_LOCK_TYPE_OPEN:
551            ops = &ocfs2_inode_open_lops;
552            break;
553        default:
554            mlog_bug_on_msg(1, "type: %d\n", type);
555            ops = NULL; /* thanks, gcc */
556            break;
557    };
558
559    ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
560                  generation, res->l_name);
561    ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
562}
563
564static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
565{
566    struct inode *inode = ocfs2_lock_res_inode(lockres);
567
568    return OCFS2_SB(inode->i_sb);
569}
570
571static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
572{
573    struct ocfs2_mem_dqinfo *info = lockres->l_priv;
574
575    return OCFS2_SB(info->dqi_gi.dqi_sb);
576}
577
578static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
579{
580    struct ocfs2_file_private *fp = lockres->l_priv;
581
582    return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
583}
584
585static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
586{
587    __be64 inode_blkno_be;
588
589    memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
590           sizeof(__be64));
591
592    return be64_to_cpu(inode_blkno_be);
593}
594
595static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
596{
597    struct ocfs2_dentry_lock *dl = lockres->l_priv;
598
599    return OCFS2_SB(dl->dl_inode->i_sb);
600}
601
602void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
603                u64 parent, struct inode *inode)
604{
605    int len;
606    u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
607    __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
608    struct ocfs2_lock_res *lockres = &dl->dl_lockres;
609
610    ocfs2_lock_res_init_once(lockres);
611
612    /*
613     * Unfortunately, the standard lock naming scheme won't work
614     * here because we have two 16 byte values to use. Instead,
615     * we'll stuff the inode number as a binary value. We still
616     * want error prints to show something without garbling the
617     * display, so drop a null byte in there before the inode
618     * number. A future version of OCFS2 will likely use all
619     * binary lock names. The stringified names have been a
620     * tremendous aid in debugging, but now that the debugfs
621     * interface exists, we can mangle things there if need be.
622     *
623     * NOTE: We also drop the standard "pad" value (the total lock
624     * name size stays the same though - the last part is all
625     * zeros due to the memset in ocfs2_lock_res_init_once()
626     */
627    len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
628               "%c%016llx",
629               ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
630               (long long)parent);
631
632    BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
633
634    memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
635           sizeof(__be64));
636
637    ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
638                   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
639                   dl);
640}
641
642static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
643                      struct ocfs2_super *osb)
644{
645    /* Superblock lockres doesn't come from a slab so we call init
646     * once on it manually. */
647    ocfs2_lock_res_init_once(res);
648    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
649                  0, res->l_name);
650    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
651                   &ocfs2_super_lops, osb);
652}
653
654static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
655                       struct ocfs2_super *osb)
656{
657    /* Rename lockres doesn't come from a slab so we call init
658     * once on it manually. */
659    ocfs2_lock_res_init_once(res);
660    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
661    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
662                   &ocfs2_rename_lops, osb);
663}
664
665static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
666                     struct ocfs2_super *osb)
667{
668    /* nfs_sync lockres doesn't come from a slab so we call init
669     * once on it manually. */
670    ocfs2_lock_res_init_once(res);
671    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
672    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
673                   &ocfs2_nfs_sync_lops, osb);
674}
675
676static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
677                        struct ocfs2_super *osb)
678{
679    ocfs2_lock_res_init_once(res);
680    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
681    ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
682                   &ocfs2_orphan_scan_lops, osb);
683}
684
685void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
686                  struct ocfs2_file_private *fp)
687{
688    struct inode *inode = fp->fp_file->f_mapping->host;
689    struct ocfs2_inode_info *oi = OCFS2_I(inode);
690
691    ocfs2_lock_res_init_once(lockres);
692    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
693                  inode->i_generation, lockres->l_name);
694    ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
695                   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
696                   fp);
697    lockres->l_flags |= OCFS2_LOCK_NOCACHE;
698}
699
700void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
701                   struct ocfs2_mem_dqinfo *info)
702{
703    ocfs2_lock_res_init_once(lockres);
704    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
705                  0, lockres->l_name);
706    ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
707                   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
708                   info);
709}
710
711void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
712                  struct ocfs2_super *osb, u64 ref_blkno,
713                  unsigned int generation)
714{
715    ocfs2_lock_res_init_once(lockres);
716    ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
717                  generation, lockres->l_name);
718    ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
719                   &ocfs2_refcount_block_lops, osb);
720}
721
722void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
723{
724    if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
725        return;
726
727    ocfs2_remove_lockres_tracking(res);
728
729    mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
730            "Lockres %s is on the blocked list\n",
731            res->l_name);
732    mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
733            "Lockres %s has mask waiters pending\n",
734            res->l_name);
735    mlog_bug_on_msg(spin_is_locked(&res->l_lock),
736            "Lockres %s is locked\n",
737            res->l_name);
738    mlog_bug_on_msg(res->l_ro_holders,
739            "Lockres %s has %u ro holders\n",
740            res->l_name, res->l_ro_holders);
741    mlog_bug_on_msg(res->l_ex_holders,
742            "Lockres %s has %u ex holders\n",
743            res->l_name, res->l_ex_holders);
744
745    /* Need to clear out the lock status block for the dlm */
746    memset(&res->l_lksb, 0, sizeof(res->l_lksb));
747
748    res->l_flags = 0UL;
749}
750
751static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
752                     int level)
753{
754    BUG_ON(!lockres);
755
756    switch(level) {
757    case DLM_LOCK_EX:
758        lockres->l_ex_holders++;
759        break;
760    case DLM_LOCK_PR:
761        lockres->l_ro_holders++;
762        break;
763    default:
764        BUG();
765    }
766}
767
768static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
769                     int level)
770{
771    BUG_ON(!lockres);
772
773    switch(level) {
774    case DLM_LOCK_EX:
775        BUG_ON(!lockres->l_ex_holders);
776        lockres->l_ex_holders--;
777        break;
778    case DLM_LOCK_PR:
779        BUG_ON(!lockres->l_ro_holders);
780        lockres->l_ro_holders--;
781        break;
782    default:
783        BUG();
784    }
785}
786
787/* WARNING: This function lives in a world where the only three lock
788 * levels are EX, PR, and NL. It *will* have to be adjusted when more
789 * lock types are added. */
790static inline int ocfs2_highest_compat_lock_level(int level)
791{
792    int new_level = DLM_LOCK_EX;
793
794    if (level == DLM_LOCK_EX)
795        new_level = DLM_LOCK_NL;
796    else if (level == DLM_LOCK_PR)
797        new_level = DLM_LOCK_PR;
798    return new_level;
799}
800
801static void lockres_set_flags(struct ocfs2_lock_res *lockres,
802                  unsigned long newflags)
803{
804    struct ocfs2_mask_waiter *mw, *tmp;
805
806     assert_spin_locked(&lockres->l_lock);
807
808    lockres->l_flags = newflags;
809
810    list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
811        if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
812            continue;
813
814        list_del_init(&mw->mw_item);
815        mw->mw_status = 0;
816        complete(&mw->mw_complete);
817    }
818}
819static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
820{
821    lockres_set_flags(lockres, lockres->l_flags | or);
822}
823static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
824                unsigned long clear)
825{
826    lockres_set_flags(lockres, lockres->l_flags & ~clear);
827}
828
829static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
830{
831    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
832    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
833    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
834    BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
835
836    lockres->l_level = lockres->l_requested;
837    if (lockres->l_level <=
838        ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
839        lockres->l_blocking = DLM_LOCK_NL;
840        lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
841    }
842    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
843}
844
845static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
846{
847    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
848    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
849
850    /* Convert from RO to EX doesn't really need anything as our
851     * information is already up to data. Convert from NL to
852     * *anything* however should mark ourselves as needing an
853     * update */
854    if (lockres->l_level == DLM_LOCK_NL &&
855        lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
856        lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
857
858    lockres->l_level = lockres->l_requested;
859
860    /*
861     * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
862     * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
863     * downconverting the lock before the upconvert has fully completed.
864     */
865    lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
866
867    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
868}
869
870static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
871{
872    BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
873    BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
874
875    if (lockres->l_requested > DLM_LOCK_NL &&
876        !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
877        lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
878        lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
879
880    lockres->l_level = lockres->l_requested;
881    lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
882    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
883}
884
885static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
886                     int level)
887{
888    int needs_downconvert = 0;
889
890    assert_spin_locked(&lockres->l_lock);
891
892    if (level > lockres->l_blocking) {
893        /* only schedule a downconvert if we haven't already scheduled
894         * one that goes low enough to satisfy the level we're
895         * blocking. this also catches the case where we get
896         * duplicate BASTs */
897        if (ocfs2_highest_compat_lock_level(level) <
898            ocfs2_highest_compat_lock_level(lockres->l_blocking))
899            needs_downconvert = 1;
900
901        lockres->l_blocking = level;
902    }
903
904    mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
905         lockres->l_name, level, lockres->l_level, lockres->l_blocking,
906         needs_downconvert);
907
908    if (needs_downconvert)
909        lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
910    mlog(0, "needs_downconvert = %d\n", needs_downconvert);
911    return needs_downconvert;
912}
913
914/*
915 * OCFS2_LOCK_PENDING and l_pending_gen.
916 *
917 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting
918 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock()
919 * for more details on the race.
920 *
921 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces
922 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock()
923 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear
924 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns,
925 * the caller is going to try to clear PENDING again. If nothing else is
926 * happening, __lockres_clear_pending() sees PENDING is unset and does
927 * nothing.
928 *
929 * But what if another path (eg downconvert thread) has just started a
930 * new locking action? The other path has re-set PENDING. Our path
931 * cannot clear PENDING, because that will re-open the original race
932 * window.
933 *
934 * [Example]
935 *
936 * ocfs2_meta_lock()
937 * ocfs2_cluster_lock()
938 * set BUSY
939 * set PENDING
940 * drop l_lock
941 * ocfs2_dlm_lock()
942 * ocfs2_locking_ast() ocfs2_downconvert_thread()
943 * clear PENDING ocfs2_unblock_lock()
944 * take_l_lock
945 * !BUSY
946 * ocfs2_prepare_downconvert()
947 * set BUSY
948 * set PENDING
949 * drop l_lock
950 * take l_lock
951 * clear PENDING
952 * drop l_lock
953 * <window>
954 * ocfs2_dlm_lock()
955 *
956 * So as you can see, we now have a window where l_lock is not held,
957 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
958 *
959 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
960 * set by ocfs2_prepare_downconvert(). That wasn't nice.
961 *
962 * To solve this we introduce l_pending_gen. A call to
963 * lockres_clear_pending() will only do so when it is passed a generation
964 * number that matches the lockres. lockres_set_pending() will return the
965 * current generation number. When ocfs2_cluster_lock() goes to clear
966 * PENDING, it passes the generation it got from set_pending(). In our
967 * example above, the generation numbers will *not* match. Thus,
968 * ocfs2_cluster_lock() will not clear the PENDING set by
969 * ocfs2_prepare_downconvert().
970 */
971
972/* Unlocked version for ocfs2_locking_ast() */
973static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
974                    unsigned int generation,
975                    struct ocfs2_super *osb)
976{
977    assert_spin_locked(&lockres->l_lock);
978
979    /*
980     * The ast and locking functions can race us here. The winner
981     * will clear pending, the loser will not.
982     */
983    if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
984        (lockres->l_pending_gen != generation))
985        return;
986
987    lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
988    lockres->l_pending_gen++;
989
990    /*
991     * The downconvert thread may have skipped us because we
992     * were PENDING. Wake it up.
993     */
994    if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
995        ocfs2_wake_downconvert_thread(osb);
996}
997
998/* Locked version for callers of ocfs2_dlm_lock() */
999static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1000                  unsigned int generation,
1001                  struct ocfs2_super *osb)
1002{
1003    unsigned long flags;
1004
1005    spin_lock_irqsave(&lockres->l_lock, flags);
1006    __lockres_clear_pending(lockres, generation, osb);
1007    spin_unlock_irqrestore(&lockres->l_lock, flags);
1008}
1009
1010static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1011{
1012    assert_spin_locked(&lockres->l_lock);
1013    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1014
1015    lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1016
1017    return lockres->l_pending_gen;
1018}
1019
1020static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1021{
1022    struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1023    struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1024    int needs_downconvert;
1025    unsigned long flags;
1026
1027    BUG_ON(level <= DLM_LOCK_NL);
1028
1029    mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1030         "type %s\n", lockres->l_name, level, lockres->l_level,
1031         ocfs2_lock_type_string(lockres->l_type));
1032
1033    /*
1034     * We can skip the bast for locks which don't enable caching -
1035     * they'll be dropped at the earliest possible time anyway.
1036     */
1037    if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1038        return;
1039
1040    spin_lock_irqsave(&lockres->l_lock, flags);
1041    needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1042    if (needs_downconvert)
1043        ocfs2_schedule_blocked_lock(osb, lockres);
1044    spin_unlock_irqrestore(&lockres->l_lock, flags);
1045
1046    wake_up(&lockres->l_event);
1047
1048    ocfs2_wake_downconvert_thread(osb);
1049}
1050
1051static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1052{
1053    struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1054    struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1055    unsigned long flags;
1056    int status;
1057
1058    spin_lock_irqsave(&lockres->l_lock, flags);
1059
1060    status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1061
1062    if (status == -EAGAIN) {
1063        lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1064        goto out;
1065    }
1066
1067    if (status) {
1068        mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1069             lockres->l_name, status);
1070        spin_unlock_irqrestore(&lockres->l_lock, flags);
1071        return;
1072    }
1073
1074    mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1075         "level %d => %d\n", lockres->l_name, lockres->l_action,
1076         lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1077
1078    switch(lockres->l_action) {
1079    case OCFS2_AST_ATTACH:
1080        ocfs2_generic_handle_attach_action(lockres);
1081        lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1082        break;
1083    case OCFS2_AST_CONVERT:
1084        ocfs2_generic_handle_convert_action(lockres);
1085        break;
1086    case OCFS2_AST_DOWNCONVERT:
1087        ocfs2_generic_handle_downconvert_action(lockres);
1088        break;
1089    default:
1090        mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1091             "flags 0x%lx, unlock: %u\n",
1092             lockres->l_name, lockres->l_action, lockres->l_flags,
1093             lockres->l_unlock_action);
1094        BUG();
1095    }
1096out:
1097    /* set it to something invalid so if we get called again we
1098     * can catch it. */
1099    lockres->l_action = OCFS2_AST_INVALID;
1100
1101    /* Did we try to cancel this lock? Clear that state */
1102    if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1103        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1104
1105    /*
1106     * We may have beaten the locking functions here. We certainly
1107     * know that dlm_lock() has been called :-)
1108     * Because we can't have two lock calls in flight at once, we
1109     * can use lockres->l_pending_gen.
1110     */
1111    __lockres_clear_pending(lockres, lockres->l_pending_gen, osb);
1112
1113    wake_up(&lockres->l_event);
1114    spin_unlock_irqrestore(&lockres->l_lock, flags);
1115}
1116
1117static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1118{
1119    struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1120    unsigned long flags;
1121
1122    mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1123         lockres->l_name, lockres->l_unlock_action);
1124
1125    spin_lock_irqsave(&lockres->l_lock, flags);
1126    if (error) {
1127        mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1128             "unlock_action %d\n", error, lockres->l_name,
1129             lockres->l_unlock_action);
1130        spin_unlock_irqrestore(&lockres->l_lock, flags);
1131        return;
1132    }
1133
1134    switch(lockres->l_unlock_action) {
1135    case OCFS2_UNLOCK_CANCEL_CONVERT:
1136        mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1137        lockres->l_action = OCFS2_AST_INVALID;
1138        /* Downconvert thread may have requeued this lock, we
1139         * need to wake it. */
1140        if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1141            ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1142        break;
1143    case OCFS2_UNLOCK_DROP_LOCK:
1144        lockres->l_level = DLM_LOCK_IV;
1145        break;
1146    default:
1147        BUG();
1148    }
1149
1150    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1151    lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1152    wake_up(&lockres->l_event);
1153    spin_unlock_irqrestore(&lockres->l_lock, flags);
1154}
1155
1156/*
1157 * This is the filesystem locking protocol. It provides the lock handling
1158 * hooks for the underlying DLM. It has a maximum version number.
1159 * The version number allows interoperability with systems running at
1160 * the same major number and an equal or smaller minor number.
1161 *
1162 * Whenever the filesystem does new things with locks (adds or removes a
1163 * lock, orders them differently, does different things underneath a lock),
1164 * the version must be changed. The protocol is negotiated when joining
1165 * the dlm domain. A node may join the domain if its major version is
1166 * identical to all other nodes and its minor version is greater than
1167 * or equal to all other nodes. When its minor version is greater than
1168 * the other nodes, it will run at the minor version specified by the
1169 * other nodes.
1170 *
1171 * If a locking change is made that will not be compatible with older
1172 * versions, the major number must be increased and the minor version set
1173 * to zero. If a change merely adds a behavior that can be disabled when
1174 * speaking to older versions, the minor version must be increased. If a
1175 * change adds a fully backwards compatible change (eg, LVB changes that
1176 * are just ignored by older versions), the version does not need to be
1177 * updated.
1178 */
1179static struct ocfs2_locking_protocol lproto = {
1180    .lp_max_version = {
1181        .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1182        .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1183    },
1184    .lp_lock_ast = ocfs2_locking_ast,
1185    .lp_blocking_ast = ocfs2_blocking_ast,
1186    .lp_unlock_ast = ocfs2_unlock_ast,
1187};
1188
1189void ocfs2_set_locking_protocol(void)
1190{
1191    ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1192}
1193
1194static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1195                        int convert)
1196{
1197    unsigned long flags;
1198
1199    spin_lock_irqsave(&lockres->l_lock, flags);
1200    lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1201    lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1202    if (convert)
1203        lockres->l_action = OCFS2_AST_INVALID;
1204    else
1205        lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1206    spin_unlock_irqrestore(&lockres->l_lock, flags);
1207
1208    wake_up(&lockres->l_event);
1209}
1210
1211/* Note: If we detect another process working on the lock (i.e.,
1212 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1213 * to do the right thing in that case.
1214 */
1215static int ocfs2_lock_create(struct ocfs2_super *osb,
1216                 struct ocfs2_lock_res *lockres,
1217                 int level,
1218                 u32 dlm_flags)
1219{
1220    int ret = 0;
1221    unsigned long flags;
1222    unsigned int gen;
1223
1224    mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1225         dlm_flags);
1226
1227    spin_lock_irqsave(&lockres->l_lock, flags);
1228    if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1229        (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1230        spin_unlock_irqrestore(&lockres->l_lock, flags);
1231        goto bail;
1232    }
1233
1234    lockres->l_action = OCFS2_AST_ATTACH;
1235    lockres->l_requested = level;
1236    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1237    gen = lockres_set_pending(lockres);
1238    spin_unlock_irqrestore(&lockres->l_lock, flags);
1239
1240    ret = ocfs2_dlm_lock(osb->cconn,
1241                 level,
1242                 &lockres->l_lksb,
1243                 dlm_flags,
1244                 lockres->l_name,
1245                 OCFS2_LOCK_ID_MAX_LEN - 1);
1246    lockres_clear_pending(lockres, gen, osb);
1247    if (ret) {
1248        ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1249        ocfs2_recover_from_dlm_error(lockres, 1);
1250    }
1251
1252    mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1253
1254bail:
1255    return ret;
1256}
1257
1258static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1259                    int flag)
1260{
1261    unsigned long flags;
1262    int ret;
1263
1264    spin_lock_irqsave(&lockres->l_lock, flags);
1265    ret = lockres->l_flags & flag;
1266    spin_unlock_irqrestore(&lockres->l_lock, flags);
1267
1268    return ret;
1269}
1270
1271static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1272
1273{
1274    wait_event(lockres->l_event,
1275           !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1276}
1277
1278static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1279
1280{
1281    wait_event(lockres->l_event,
1282           !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1283}
1284
1285/* predict what lock level we'll be dropping down to on behalf
1286 * of another node, and return true if the currently wanted
1287 * level will be compatible with it. */
1288static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1289                             int wanted)
1290{
1291    BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1292
1293    return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1294}
1295
1296static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1297{
1298    INIT_LIST_HEAD(&mw->mw_item);
1299    init_completion(&mw->mw_complete);
1300    ocfs2_init_start_time(mw);
1301}
1302
1303static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1304{
1305    wait_for_completion(&mw->mw_complete);
1306    /* Re-arm the completion in case we want to wait on it again */
1307    INIT_COMPLETION(mw->mw_complete);
1308    return mw->mw_status;
1309}
1310
1311static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1312                    struct ocfs2_mask_waiter *mw,
1313                    unsigned long mask,
1314                    unsigned long goal)
1315{
1316    BUG_ON(!list_empty(&mw->mw_item));
1317
1318    assert_spin_locked(&lockres->l_lock);
1319
1320    list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1321    mw->mw_mask = mask;
1322    mw->mw_goal = goal;
1323}
1324
1325/* returns 0 if the mw that was removed was already satisfied, -EBUSY
1326 * if the mask still hadn't reached its goal */
1327static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1328                      struct ocfs2_mask_waiter *mw)
1329{
1330    unsigned long flags;
1331    int ret = 0;
1332
1333    spin_lock_irqsave(&lockres->l_lock, flags);
1334    if (!list_empty(&mw->mw_item)) {
1335        if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1336            ret = -EBUSY;
1337
1338        list_del_init(&mw->mw_item);
1339        init_completion(&mw->mw_complete);
1340    }
1341    spin_unlock_irqrestore(&lockres->l_lock, flags);
1342
1343    return ret;
1344
1345}
1346
1347static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1348                         struct ocfs2_lock_res *lockres)
1349{
1350    int ret;
1351
1352    ret = wait_for_completion_interruptible(&mw->mw_complete);
1353    if (ret)
1354        lockres_remove_mask_waiter(lockres, mw);
1355    else
1356        ret = mw->mw_status;
1357    /* Re-arm the completion in case we want to wait on it again */
1358    INIT_COMPLETION(mw->mw_complete);
1359    return ret;
1360}
1361
1362static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1363                struct ocfs2_lock_res *lockres,
1364                int level,
1365                u32 lkm_flags,
1366                int arg_flags,
1367                int l_subclass,
1368                unsigned long caller_ip)
1369{
1370    struct ocfs2_mask_waiter mw;
1371    int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1372    int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1373    unsigned long flags;
1374    unsigned int gen;
1375    int noqueue_attempted = 0;
1376
1377    ocfs2_init_mask_waiter(&mw);
1378
1379    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1380        lkm_flags |= DLM_LKF_VALBLK;
1381
1382again:
1383    wait = 0;
1384
1385    spin_lock_irqsave(&lockres->l_lock, flags);
1386
1387    if (catch_signals && signal_pending(current)) {
1388        ret = -ERESTARTSYS;
1389        goto unlock;
1390    }
1391
1392    mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1393            "Cluster lock called on freeing lockres %s! flags "
1394            "0x%lx\n", lockres->l_name, lockres->l_flags);
1395
1396    /* We only compare against the currently granted level
1397     * here. If the lock is blocked waiting on a downconvert,
1398     * we'll get caught below. */
1399    if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1400        level > lockres->l_level) {
1401        /* is someone sitting in dlm_lock? If so, wait on
1402         * them. */
1403        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1404        wait = 1;
1405        goto unlock;
1406    }
1407
1408    if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1409        /*
1410         * We've upconverted. If the lock now has a level we can
1411         * work with, we take it. If, however, the lock is not at the
1412         * required level, we go thru the full cycle. One way this could
1413         * happen is if a process requesting an upconvert to PR is
1414         * closely followed by another requesting upconvert to an EX.
1415         * If the process requesting EX lands here, we want it to
1416         * continue attempting to upconvert and let the process
1417         * requesting PR take the lock.
1418         * If multiple processes request upconvert to PR, the first one
1419         * here will take the lock. The others will have to go thru the
1420         * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1421         * downconvert request.
1422         */
1423        if (level <= lockres->l_level)
1424            goto update_holders;
1425    }
1426
1427    if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1428        !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1429        /* is the lock is currently blocked on behalf of
1430         * another node */
1431        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1432        wait = 1;
1433        goto unlock;
1434    }
1435
1436    if (level > lockres->l_level) {
1437        if (noqueue_attempted > 0) {
1438            ret = -EAGAIN;
1439            goto unlock;
1440        }
1441        if (lkm_flags & DLM_LKF_NOQUEUE)
1442            noqueue_attempted = 1;
1443
1444        if (lockres->l_action != OCFS2_AST_INVALID)
1445            mlog(ML_ERROR, "lockres %s has action %u pending\n",
1446                 lockres->l_name, lockres->l_action);
1447
1448        if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1449            lockres->l_action = OCFS2_AST_ATTACH;
1450            lkm_flags &= ~DLM_LKF_CONVERT;
1451        } else {
1452            lockres->l_action = OCFS2_AST_CONVERT;
1453            lkm_flags |= DLM_LKF_CONVERT;
1454        }
1455
1456        lockres->l_requested = level;
1457        lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1458        gen = lockres_set_pending(lockres);
1459        spin_unlock_irqrestore(&lockres->l_lock, flags);
1460
1461        BUG_ON(level == DLM_LOCK_IV);
1462        BUG_ON(level == DLM_LOCK_NL);
1463
1464        mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1465             lockres->l_name, lockres->l_level, level);
1466
1467        /* call dlm_lock to upgrade lock now */
1468        ret = ocfs2_dlm_lock(osb->cconn,
1469                     level,
1470                     &lockres->l_lksb,
1471                     lkm_flags,
1472                     lockres->l_name,
1473                     OCFS2_LOCK_ID_MAX_LEN - 1);
1474        lockres_clear_pending(lockres, gen, osb);
1475        if (ret) {
1476            if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1477                (ret != -EAGAIN)) {
1478                ocfs2_log_dlm_error("ocfs2_dlm_lock",
1479                            ret, lockres);
1480            }
1481            ocfs2_recover_from_dlm_error(lockres, 1);
1482            goto out;
1483        }
1484
1485        mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1486             lockres->l_name);
1487
1488        /* At this point we've gone inside the dlm and need to
1489         * complete our work regardless. */
1490        catch_signals = 0;
1491
1492        /* wait for busy to clear and carry on */
1493        goto again;
1494    }
1495
1496update_holders:
1497    /* Ok, if we get here then we're good to go. */
1498    ocfs2_inc_holders(lockres, level);
1499
1500    ret = 0;
1501unlock:
1502    lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1503
1504    spin_unlock_irqrestore(&lockres->l_lock, flags);
1505out:
1506    /*
1507     * This is helping work around a lock inversion between the page lock
1508     * and dlm locks. One path holds the page lock while calling aops
1509     * which block acquiring dlm locks. The voting thread holds dlm
1510     * locks while acquiring page locks while down converting data locks.
1511     * This block is helping an aop path notice the inversion and back
1512     * off to unlock its page lock before trying the dlm lock again.
1513     */
1514    if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1515        mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1516        wait = 0;
1517        if (lockres_remove_mask_waiter(lockres, &mw))
1518            ret = -EAGAIN;
1519        else
1520            goto again;
1521    }
1522    if (wait) {
1523        ret = ocfs2_wait_for_mask(&mw);
1524        if (ret == 0)
1525            goto again;
1526        mlog_errno(ret);
1527    }
1528    ocfs2_update_lock_stats(lockres, level, &mw, ret);
1529
1530#ifdef CONFIG_DEBUG_LOCK_ALLOC
1531    if (!ret && lockres->l_lockdep_map.key != NULL) {
1532        if (level == DLM_LOCK_PR)
1533            rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1534                !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1535                caller_ip);
1536        else
1537            rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1538                !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1539                caller_ip);
1540    }
1541#endif
1542    return ret;
1543}
1544
1545static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1546                     struct ocfs2_lock_res *lockres,
1547                     int level,
1548                     u32 lkm_flags,
1549                     int arg_flags)
1550{
1551    return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1552                    0, _RET_IP_);
1553}
1554
1555
1556static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1557                   struct ocfs2_lock_res *lockres,
1558                   int level,
1559                   unsigned long caller_ip)
1560{
1561    unsigned long flags;
1562
1563    spin_lock_irqsave(&lockres->l_lock, flags);
1564    ocfs2_dec_holders(lockres, level);
1565    ocfs2_downconvert_on_unlock(osb, lockres);
1566    spin_unlock_irqrestore(&lockres->l_lock, flags);
1567#ifdef CONFIG_DEBUG_LOCK_ALLOC
1568    if (lockres->l_lockdep_map.key != NULL)
1569        rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1570#endif
1571}
1572
1573static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1574                 struct ocfs2_lock_res *lockres,
1575                 int ex,
1576                 int local)
1577{
1578    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1579    unsigned long flags;
1580    u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1581
1582    spin_lock_irqsave(&lockres->l_lock, flags);
1583    BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1584    lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1585    spin_unlock_irqrestore(&lockres->l_lock, flags);
1586
1587    return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1588}
1589
1590/* Grants us an EX lock on the data and metadata resources, skipping
1591 * the normal cluster directory lookup. Use this ONLY on newly created
1592 * inodes which other nodes can't possibly see, and which haven't been
1593 * hashed in the inode hash yet. This can give us a good performance
1594 * increase as it'll skip the network broadcast normally associated
1595 * with creating a new lock resource. */
1596int ocfs2_create_new_inode_locks(struct inode *inode)
1597{
1598    int ret;
1599    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1600
1601    BUG_ON(!inode);
1602    BUG_ON(!ocfs2_inode_is_new(inode));
1603
1604    mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1605
1606    /* NOTE: That we don't increment any of the holder counts, nor
1607     * do we add anything to a journal handle. Since this is
1608     * supposed to be a new inode which the cluster doesn't know
1609     * about yet, there is no need to. As far as the LVB handling
1610     * is concerned, this is basically like acquiring an EX lock
1611     * on a resource which has an invalid one -- we'll set it
1612     * valid when we release the EX. */
1613
1614    ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1615    if (ret) {
1616        mlog_errno(ret);
1617        goto bail;
1618    }
1619
1620    /*
1621     * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1622     * don't use a generation in their lock names.
1623     */
1624    ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1625    if (ret) {
1626        mlog_errno(ret);
1627        goto bail;
1628    }
1629
1630    ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1631    if (ret) {
1632        mlog_errno(ret);
1633        goto bail;
1634    }
1635
1636bail:
1637    return ret;
1638}
1639
1640int ocfs2_rw_lock(struct inode *inode, int write)
1641{
1642    int status, level;
1643    struct ocfs2_lock_res *lockres;
1644    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1645
1646    BUG_ON(!inode);
1647
1648    mlog(0, "inode %llu take %s RW lock\n",
1649         (unsigned long long)OCFS2_I(inode)->ip_blkno,
1650         write ? "EXMODE" : "PRMODE");
1651
1652    if (ocfs2_mount_local(osb))
1653        return 0;
1654
1655    lockres = &OCFS2_I(inode)->ip_rw_lockres;
1656
1657    level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1658
1659    status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1660                    0);
1661    if (status < 0)
1662        mlog_errno(status);
1663
1664    return status;
1665}
1666
1667void ocfs2_rw_unlock(struct inode *inode, int write)
1668{
1669    int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1670    struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1671    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1672
1673    mlog(0, "inode %llu drop %s RW lock\n",
1674         (unsigned long long)OCFS2_I(inode)->ip_blkno,
1675         write ? "EXMODE" : "PRMODE");
1676
1677    if (!ocfs2_mount_local(osb))
1678        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1679}
1680
1681/*
1682 * ocfs2_open_lock always get PR mode lock.
1683 */
1684int ocfs2_open_lock(struct inode *inode)
1685{
1686    int status = 0;
1687    struct ocfs2_lock_res *lockres;
1688    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1689
1690    BUG_ON(!inode);
1691
1692    mlog(0, "inode %llu take PRMODE open lock\n",
1693         (unsigned long long)OCFS2_I(inode)->ip_blkno);
1694
1695    if (ocfs2_mount_local(osb))
1696        goto out;
1697
1698    lockres = &OCFS2_I(inode)->ip_open_lockres;
1699
1700    status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1701                    DLM_LOCK_PR, 0, 0);
1702    if (status < 0)
1703        mlog_errno(status);
1704
1705out:
1706    return status;
1707}
1708
1709int ocfs2_try_open_lock(struct inode *inode, int write)
1710{
1711    int status = 0, level;
1712    struct ocfs2_lock_res *lockres;
1713    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1714
1715    BUG_ON(!inode);
1716
1717    mlog(0, "inode %llu try to take %s open lock\n",
1718         (unsigned long long)OCFS2_I(inode)->ip_blkno,
1719         write ? "EXMODE" : "PRMODE");
1720
1721    if (ocfs2_mount_local(osb))
1722        goto out;
1723
1724    lockres = &OCFS2_I(inode)->ip_open_lockres;
1725
1726    level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1727
1728    /*
1729     * The file system may already holding a PRMODE/EXMODE open lock.
1730     * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1731     * other nodes and the -EAGAIN will indicate to the caller that
1732     * this inode is still in use.
1733     */
1734    status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1735                    level, DLM_LKF_NOQUEUE, 0);
1736
1737out:
1738    return status;
1739}
1740
1741/*
1742 * ocfs2_open_unlock unlock PR and EX mode open locks.
1743 */
1744void ocfs2_open_unlock(struct inode *inode)
1745{
1746    struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1747    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1748
1749    mlog(0, "inode %llu drop open lock\n",
1750         (unsigned long long)OCFS2_I(inode)->ip_blkno);
1751
1752    if (ocfs2_mount_local(osb))
1753        goto out;
1754
1755    if(lockres->l_ro_holders)
1756        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1757                     DLM_LOCK_PR);
1758    if(lockres->l_ex_holders)
1759        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1760                     DLM_LOCK_EX);
1761
1762out:
1763    return;
1764}
1765
1766static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1767                     int level)
1768{
1769    int ret;
1770    struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1771    unsigned long flags;
1772    struct ocfs2_mask_waiter mw;
1773
1774    ocfs2_init_mask_waiter(&mw);
1775
1776retry_cancel:
1777    spin_lock_irqsave(&lockres->l_lock, flags);
1778    if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1779        ret = ocfs2_prepare_cancel_convert(osb, lockres);
1780        if (ret) {
1781            spin_unlock_irqrestore(&lockres->l_lock, flags);
1782            ret = ocfs2_cancel_convert(osb, lockres);
1783            if (ret < 0) {
1784                mlog_errno(ret);
1785                goto out;
1786            }
1787            goto retry_cancel;
1788        }
1789        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1790        spin_unlock_irqrestore(&lockres->l_lock, flags);
1791
1792        ocfs2_wait_for_mask(&mw);
1793        goto retry_cancel;
1794    }
1795
1796    ret = -ERESTARTSYS;
1797    /*
1798     * We may still have gotten the lock, in which case there's no
1799     * point to restarting the syscall.
1800     */
1801    if (lockres->l_level == level)
1802        ret = 0;
1803
1804    mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1805         lockres->l_flags, lockres->l_level, lockres->l_action);
1806
1807    spin_unlock_irqrestore(&lockres->l_lock, flags);
1808
1809out:
1810    return ret;
1811}
1812
1813/*
1814 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1815 * flock() calls. The locking approach this requires is sufficiently
1816 * different from all other cluster lock types that we implement a
1817 * separate path to the "low-level" dlm calls. In particular:
1818 *
1819 * - No optimization of lock levels is done - we take at exactly
1820 * what's been requested.
1821 *
1822 * - No lock caching is employed. We immediately downconvert to
1823 * no-lock at unlock time. This also means flock locks never go on
1824 * the blocking list).
1825 *
1826 * - Since userspace can trivially deadlock itself with flock, we make
1827 * sure to allow cancellation of a misbehaving applications flock()
1828 * request.
1829 *
1830 * - Access to any flock lockres doesn't require concurrency, so we
1831 * can simplify the code by requiring the caller to guarantee
1832 * serialization of dlmglue flock calls.
1833 */
1834int ocfs2_file_lock(struct file *file, int ex, int trylock)
1835{
1836    int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1837    unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1838    unsigned long flags;
1839    struct ocfs2_file_private *fp = file->private_data;
1840    struct ocfs2_lock_res *lockres = &fp->fp_flock;
1841    struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1842    struct ocfs2_mask_waiter mw;
1843
1844    ocfs2_init_mask_waiter(&mw);
1845
1846    if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1847        (lockres->l_level > DLM_LOCK_NL)) {
1848        mlog(ML_ERROR,
1849             "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1850             "level: %u\n", lockres->l_name, lockres->l_flags,
1851             lockres->l_level);
1852        return -EINVAL;
1853    }
1854
1855    spin_lock_irqsave(&lockres->l_lock, flags);
1856    if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1857        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1858        spin_unlock_irqrestore(&lockres->l_lock, flags);
1859
1860        /*
1861         * Get the lock at NLMODE to start - that way we
1862         * can cancel the upconvert request if need be.
1863         */
1864        ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1865        if (ret < 0) {
1866            mlog_errno(ret);
1867            goto out;
1868        }
1869
1870        ret = ocfs2_wait_for_mask(&mw);
1871        if (ret) {
1872            mlog_errno(ret);
1873            goto out;
1874        }
1875        spin_lock_irqsave(&lockres->l_lock, flags);
1876    }
1877
1878    lockres->l_action = OCFS2_AST_CONVERT;
1879    lkm_flags |= DLM_LKF_CONVERT;
1880    lockres->l_requested = level;
1881    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1882
1883    lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1884    spin_unlock_irqrestore(&lockres->l_lock, flags);
1885
1886    ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1887                 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
1888    if (ret) {
1889        if (!trylock || (ret != -EAGAIN)) {
1890            ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1891            ret = -EINVAL;
1892        }
1893
1894        ocfs2_recover_from_dlm_error(lockres, 1);
1895        lockres_remove_mask_waiter(lockres, &mw);
1896        goto out;
1897    }
1898
1899    ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1900    if (ret == -ERESTARTSYS) {
1901        /*
1902         * Userspace can cause deadlock itself with
1903         * flock(). Current behavior locally is to allow the
1904         * deadlock, but abort the system call if a signal is
1905         * received. We follow this example, otherwise a
1906         * poorly written program could sit in kernel until
1907         * reboot.
1908         *
1909         * Handling this is a bit more complicated for Ocfs2
1910         * though. We can't exit this function with an
1911         * outstanding lock request, so a cancel convert is
1912         * required. We intentionally overwrite 'ret' - if the
1913         * cancel fails and the lock was granted, it's easier
1914         * to just bubble success back up to the user.
1915         */
1916        ret = ocfs2_flock_handle_signal(lockres, level);
1917    } else if (!ret && (level > lockres->l_level)) {
1918        /* Trylock failed asynchronously */
1919        BUG_ON(!trylock);
1920        ret = -EAGAIN;
1921    }
1922
1923out:
1924
1925    mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1926         lockres->l_name, ex, trylock, ret);
1927    return ret;
1928}
1929
1930void ocfs2_file_unlock(struct file *file)
1931{
1932    int ret;
1933    unsigned int gen;
1934    unsigned long flags;
1935    struct ocfs2_file_private *fp = file->private_data;
1936    struct ocfs2_lock_res *lockres = &fp->fp_flock;
1937    struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1938    struct ocfs2_mask_waiter mw;
1939
1940    ocfs2_init_mask_waiter(&mw);
1941
1942    if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1943        return;
1944
1945    if (lockres->l_level == DLM_LOCK_NL)
1946        return;
1947
1948    mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1949         lockres->l_name, lockres->l_flags, lockres->l_level,
1950         lockres->l_action);
1951
1952    spin_lock_irqsave(&lockres->l_lock, flags);
1953    /*
1954     * Fake a blocking ast for the downconvert code.
1955     */
1956    lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1957    lockres->l_blocking = DLM_LOCK_EX;
1958
1959    gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
1960    lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1961    spin_unlock_irqrestore(&lockres->l_lock, flags);
1962
1963    ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
1964    if (ret) {
1965        mlog_errno(ret);
1966        return;
1967    }
1968
1969    ret = ocfs2_wait_for_mask(&mw);
1970    if (ret)
1971        mlog_errno(ret);
1972}
1973
1974static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1975                    struct ocfs2_lock_res *lockres)
1976{
1977    int kick = 0;
1978
1979    /* If we know that another node is waiting on our lock, kick
1980     * the downconvert thread * pre-emptively when we reach a release
1981     * condition. */
1982    if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1983        switch(lockres->l_blocking) {
1984        case DLM_LOCK_EX:
1985            if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1986                kick = 1;
1987            break;
1988        case DLM_LOCK_PR:
1989            if (!lockres->l_ex_holders)
1990                kick = 1;
1991            break;
1992        default:
1993            BUG();
1994        }
1995    }
1996
1997    if (kick)
1998        ocfs2_wake_downconvert_thread(osb);
1999}
2000
2001#define OCFS2_SEC_BITS 34
2002#define OCFS2_SEC_SHIFT (64 - 34)
2003#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1)
2004
2005/* LVB only has room for 64 bits of time here so we pack it for
2006 * now. */
2007static u64 ocfs2_pack_timespec(struct timespec *spec)
2008{
2009    u64 res;
2010    u64 sec = spec->tv_sec;
2011    u32 nsec = spec->tv_nsec;
2012
2013    res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2014
2015    return res;
2016}
2017
2018/* Call this with the lockres locked. I am reasonably sure we don't
2019 * need ip_lock in this function as anyone who would be changing those
2020 * values is supposed to be blocked in ocfs2_inode_lock right now. */
2021static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2022{
2023    struct ocfs2_inode_info *oi = OCFS2_I(inode);
2024    struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2025    struct ocfs2_meta_lvb *lvb;
2026
2027    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2028
2029    /*
2030     * Invalidate the LVB of a deleted inode - this way other
2031     * nodes are forced to go to disk and discover the new inode
2032     * status.
2033     */
2034    if (oi->ip_flags & OCFS2_INODE_DELETED) {
2035        lvb->lvb_version = 0;
2036        goto out;
2037    }
2038
2039    lvb->lvb_version = OCFS2_LVB_VERSION;
2040    lvb->lvb_isize = cpu_to_be64(i_size_read(inode));
2041    lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2042    lvb->lvb_iuid = cpu_to_be32(inode->i_uid);
2043    lvb->lvb_igid = cpu_to_be32(inode->i_gid);
2044    lvb->lvb_imode = cpu_to_be16(inode->i_mode);
2045    lvb->lvb_inlink = cpu_to_be16(inode->i_nlink);
2046    lvb->lvb_iatime_packed =
2047        cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
2048    lvb->lvb_ictime_packed =
2049        cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2050    lvb->lvb_imtime_packed =
2051        cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2052    lvb->lvb_iattr = cpu_to_be32(oi->ip_attr);
2053    lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2054    lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2055
2056out:
2057    mlog_meta_lvb(0, lockres);
2058}
2059
2060static void ocfs2_unpack_timespec(struct timespec *spec,
2061                  u64 packed_time)
2062{
2063    spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2064    spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2065}
2066
2067static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2068{
2069    struct ocfs2_inode_info *oi = OCFS2_I(inode);
2070    struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2071    struct ocfs2_meta_lvb *lvb;
2072
2073    mlog_meta_lvb(0, lockres);
2074
2075    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2076
2077    /* We're safe here without the lockres lock... */
2078    spin_lock(&oi->ip_lock);
2079    oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2080    i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2081
2082    oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2083    oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2084    ocfs2_set_inode_flags(inode);
2085
2086    /* fast-symlinks are a special case */
2087    if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2088        inode->i_blocks = 0;
2089    else
2090        inode->i_blocks = ocfs2_inode_sector_count(inode);
2091
2092    inode->i_uid = be32_to_cpu(lvb->lvb_iuid);
2093    inode->i_gid = be32_to_cpu(lvb->lvb_igid);
2094    inode->i_mode = be16_to_cpu(lvb->lvb_imode);
2095    inode->i_nlink = be16_to_cpu(lvb->lvb_inlink);
2096    ocfs2_unpack_timespec(&inode->i_atime,
2097                  be64_to_cpu(lvb->lvb_iatime_packed));
2098    ocfs2_unpack_timespec(&inode->i_mtime,
2099                  be64_to_cpu(lvb->lvb_imtime_packed));
2100    ocfs2_unpack_timespec(&inode->i_ctime,
2101                  be64_to_cpu(lvb->lvb_ictime_packed));
2102    spin_unlock(&oi->ip_lock);
2103}
2104
2105static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2106                          struct ocfs2_lock_res *lockres)
2107{
2108    struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2109
2110    if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2111        && lvb->lvb_version == OCFS2_LVB_VERSION
2112        && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2113        return 1;
2114    return 0;
2115}
2116
2117/* Determine whether a lock resource needs to be refreshed, and
2118 * arbitrate who gets to refresh it.
2119 *
2120 * 0 means no refresh needed.
2121 *
2122 * > 0 means you need to refresh this and you MUST call
2123 * ocfs2_complete_lock_res_refresh afterwards. */
2124static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2125{
2126    unsigned long flags;
2127    int status = 0;
2128
2129refresh_check:
2130    spin_lock_irqsave(&lockres->l_lock, flags);
2131    if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2132        spin_unlock_irqrestore(&lockres->l_lock, flags);
2133        goto bail;
2134    }
2135
2136    if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2137        spin_unlock_irqrestore(&lockres->l_lock, flags);
2138
2139        ocfs2_wait_on_refreshing_lock(lockres);
2140        goto refresh_check;
2141    }
2142
2143    /* Ok, I'll be the one to refresh this lock. */
2144    lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2145    spin_unlock_irqrestore(&lockres->l_lock, flags);
2146
2147    status = 1;
2148bail:
2149    mlog(0, "status %d\n", status);
2150    return status;
2151}
2152
2153/* If status is non zero, I'll mark it as not being in refresh
2154 * anymroe, but i won't clear the needs refresh flag. */
2155static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2156                           int status)
2157{
2158    unsigned long flags;
2159
2160    spin_lock_irqsave(&lockres->l_lock, flags);
2161    lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2162    if (!status)
2163        lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2164    spin_unlock_irqrestore(&lockres->l_lock, flags);
2165
2166    wake_up(&lockres->l_event);
2167}
2168
2169/* may or may not return a bh if it went to disk. */
2170static int ocfs2_inode_lock_update(struct inode *inode,
2171                  struct buffer_head **bh)
2172{
2173    int status = 0;
2174    struct ocfs2_inode_info *oi = OCFS2_I(inode);
2175    struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2176    struct ocfs2_dinode *fe;
2177    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2178
2179    if (ocfs2_mount_local(osb))
2180        goto bail;
2181
2182    spin_lock(&oi->ip_lock);
2183    if (oi->ip_flags & OCFS2_INODE_DELETED) {
2184        mlog(0, "Orphaned inode %llu was deleted while we "
2185             "were waiting on a lock. ip_flags = 0x%x\n",
2186             (unsigned long long)oi->ip_blkno, oi->ip_flags);
2187        spin_unlock(&oi->ip_lock);
2188        status = -ENOENT;
2189        goto bail;
2190    }
2191    spin_unlock(&oi->ip_lock);
2192
2193    if (!ocfs2_should_refresh_lock_res(lockres))
2194        goto bail;
2195
2196    /* This will discard any caching information we might have had
2197     * for the inode metadata. */
2198    ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2199
2200    ocfs2_extent_map_trunc(inode, 0);
2201
2202    if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2203        mlog(0, "Trusting LVB on inode %llu\n",
2204             (unsigned long long)oi->ip_blkno);
2205        ocfs2_refresh_inode_from_lvb(inode);
2206    } else {
2207        /* Boo, we have to go to disk. */
2208        /* read bh, cast, ocfs2_refresh_inode */
2209        status = ocfs2_read_inode_block(inode, bh);
2210        if (status < 0) {
2211            mlog_errno(status);
2212            goto bail_refresh;
2213        }
2214        fe = (struct ocfs2_dinode *) (*bh)->b_data;
2215
2216        /* This is a good chance to make sure we're not
2217         * locking an invalid object. ocfs2_read_inode_block()
2218         * already checked that the inode block is sane.
2219         *
2220         * We bug on a stale inode here because we checked
2221         * above whether it was wiped from disk. The wiping
2222         * node provides a guarantee that we receive that
2223         * message and can mark the inode before dropping any
2224         * locks associated with it. */
2225        mlog_bug_on_msg(inode->i_generation !=
2226                le32_to_cpu(fe->i_generation),
2227                "Invalid dinode %llu disk generation: %u "
2228                "inode->i_generation: %u\n",
2229                (unsigned long long)oi->ip_blkno,
2230                le32_to_cpu(fe->i_generation),
2231                inode->i_generation);
2232        mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2233                !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2234                "Stale dinode %llu dtime: %llu flags: 0x%x\n",
2235                (unsigned long long)oi->ip_blkno,
2236                (unsigned long long)le64_to_cpu(fe->i_dtime),
2237                le32_to_cpu(fe->i_flags));
2238
2239        ocfs2_refresh_inode(inode, fe);
2240        ocfs2_track_lock_refresh(lockres);
2241    }
2242
2243    status = 0;
2244bail_refresh:
2245    ocfs2_complete_lock_res_refresh(lockres, status);
2246bail:
2247    return status;
2248}
2249
2250static int ocfs2_assign_bh(struct inode *inode,
2251               struct buffer_head **ret_bh,
2252               struct buffer_head *passed_bh)
2253{
2254    int status;
2255
2256    if (passed_bh) {
2257        /* Ok, the update went to disk for us, use the
2258         * returned bh. */
2259        *ret_bh = passed_bh;
2260        get_bh(*ret_bh);
2261
2262        return 0;
2263    }
2264
2265    status = ocfs2_read_inode_block(inode, ret_bh);
2266    if (status < 0)
2267        mlog_errno(status);
2268
2269    return status;
2270}
2271
2272/*
2273 * returns < 0 error if the callback will never be called, otherwise
2274 * the result of the lock will be communicated via the callback.
2275 */
2276int ocfs2_inode_lock_full_nested(struct inode *inode,
2277                 struct buffer_head **ret_bh,
2278                 int ex,
2279                 int arg_flags,
2280                 int subclass)
2281{
2282    int status, level, acquired;
2283    u32 dlm_flags;
2284    struct ocfs2_lock_res *lockres = NULL;
2285    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2286    struct buffer_head *local_bh = NULL;
2287
2288    BUG_ON(!inode);
2289
2290    mlog(0, "inode %llu, take %s META lock\n",
2291         (unsigned long long)OCFS2_I(inode)->ip_blkno,
2292         ex ? "EXMODE" : "PRMODE");
2293
2294    status = 0;
2295    acquired = 0;
2296    /* We'll allow faking a readonly metadata lock for
2297     * rodevices. */
2298    if (ocfs2_is_hard_readonly(osb)) {
2299        if (ex)
2300            status = -EROFS;
2301        goto bail;
2302    }
2303
2304    if (ocfs2_mount_local(osb))
2305        goto local;
2306
2307    if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2308        ocfs2_wait_for_recovery(osb);
2309
2310    lockres = &OCFS2_I(inode)->ip_inode_lockres;
2311    level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2312    dlm_flags = 0;
2313    if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2314        dlm_flags |= DLM_LKF_NOQUEUE;
2315
2316    status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2317                      arg_flags, subclass, _RET_IP_);
2318    if (status < 0) {
2319        if (status != -EAGAIN && status != -EIOCBRETRY)
2320            mlog_errno(status);
2321        goto bail;
2322    }
2323
2324    /* Notify the error cleanup path to drop the cluster lock. */
2325    acquired = 1;
2326
2327    /* We wait twice because a node may have died while we were in
2328     * the lower dlm layers. The second time though, we've
2329     * committed to owning this lock so we don't allow signals to
2330     * abort the operation. */
2331    if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2332        ocfs2_wait_for_recovery(osb);
2333
2334local:
2335    /*
2336     * We only see this flag if we're being called from
2337     * ocfs2_read_locked_inode(). It means we're locking an inode
2338     * which hasn't been populated yet, so clear the refresh flag
2339     * and let the caller handle it.
2340     */
2341    if (inode->i_state & I_NEW) {
2342        status = 0;
2343        if (lockres)
2344            ocfs2_complete_lock_res_refresh(lockres, 0);
2345        goto bail;
2346    }
2347
2348    /* This is fun. The caller may want a bh back, or it may
2349     * not. ocfs2_inode_lock_update definitely wants one in, but
2350     * may or may not read one, depending on what's in the
2351     * LVB. The result of all of this is that we've *only* gone to
2352     * disk if we have to, so the complexity is worthwhile. */
2353    status = ocfs2_inode_lock_update(inode, &local_bh);
2354    if (status < 0) {
2355        if (status != -ENOENT)
2356            mlog_errno(status);
2357        goto bail;
2358    }
2359
2360    if (ret_bh) {
2361        status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2362        if (status < 0) {
2363            mlog_errno(status);
2364            goto bail;
2365        }
2366    }
2367
2368bail:
2369    if (status < 0) {
2370        if (ret_bh && (*ret_bh)) {
2371            brelse(*ret_bh);
2372            *ret_bh = NULL;
2373        }
2374        if (acquired)
2375            ocfs2_inode_unlock(inode, ex);
2376    }
2377
2378    if (local_bh)
2379        brelse(local_bh);
2380
2381    return status;
2382}
2383
2384/*
2385 * This is working around a lock inversion between tasks acquiring DLM
2386 * locks while holding a page lock and the downconvert thread which
2387 * blocks dlm lock acquiry while acquiring page locks.
2388 *
2389 * ** These _with_page variantes are only intended to be called from aop
2390 * methods that hold page locks and return a very specific *positive* error
2391 * code that aop methods pass up to the VFS -- test for errors with != 0. **
2392 *
2393 * The DLM is called such that it returns -EAGAIN if it would have
2394 * blocked waiting for the downconvert thread. In that case we unlock
2395 * our page so the downconvert thread can make progress. Once we've
2396 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2397 * that called us can bubble that back up into the VFS who will then
2398 * immediately retry the aop call.
2399 *
2400 * We do a blocking lock and immediate unlock before returning, though, so that
2401 * the lock has a great chance of being cached on this node by the time the VFS
2402 * calls back to retry the aop. This has a potential to livelock as nodes
2403 * ping locks back and forth, but that's a risk we're willing to take to avoid
2404 * the lock inversion simply.
2405 */
2406int ocfs2_inode_lock_with_page(struct inode *inode,
2407                  struct buffer_head **ret_bh,
2408                  int ex,
2409                  struct page *page)
2410{
2411    int ret;
2412
2413    ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2414    if (ret == -EAGAIN) {
2415        unlock_page(page);
2416        if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2417            ocfs2_inode_unlock(inode, ex);
2418        ret = AOP_TRUNCATED_PAGE;
2419    }
2420
2421    return ret;
2422}
2423
2424int ocfs2_inode_lock_atime(struct inode *inode,
2425              struct vfsmount *vfsmnt,
2426              int *level)
2427{
2428    int ret;
2429
2430    ret = ocfs2_inode_lock(inode, NULL, 0);
2431    if (ret < 0) {
2432        mlog_errno(ret);
2433        return ret;
2434    }
2435
2436    /*
2437     * If we should update atime, we will get EX lock,
2438     * otherwise we just get PR lock.
2439     */
2440    if (ocfs2_should_update_atime(inode, vfsmnt)) {
2441        struct buffer_head *bh = NULL;
2442
2443        ocfs2_inode_unlock(inode, 0);
2444        ret = ocfs2_inode_lock(inode, &bh, 1);
2445        if (ret < 0) {
2446            mlog_errno(ret);
2447            return ret;
2448        }
2449        *level = 1;
2450        if (ocfs2_should_update_atime(inode, vfsmnt))
2451            ocfs2_update_inode_atime(inode, bh);
2452        if (bh)
2453            brelse(bh);
2454    } else
2455        *level = 0;
2456
2457    return ret;
2458}
2459
2460void ocfs2_inode_unlock(struct inode *inode,
2461               int ex)
2462{
2463    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2464    struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2465    struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2466
2467    mlog(0, "inode %llu drop %s META lock\n",
2468         (unsigned long long)OCFS2_I(inode)->ip_blkno,
2469         ex ? "EXMODE" : "PRMODE");
2470
2471    if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2472        !ocfs2_mount_local(osb))
2473        ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2474}
2475
2476int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2477{
2478    struct ocfs2_lock_res *lockres;
2479    struct ocfs2_orphan_scan_lvb *lvb;
2480    int status = 0;
2481
2482    if (ocfs2_is_hard_readonly(osb))
2483        return -EROFS;
2484
2485    if (ocfs2_mount_local(osb))
2486        return 0;
2487
2488    lockres = &osb->osb_orphan_scan.os_lockres;
2489    status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2490    if (status < 0)
2491        return status;
2492
2493    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2494    if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2495        lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2496        *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2497    else
2498        *seqno = osb->osb_orphan_scan.os_seqno + 1;
2499
2500    return status;
2501}
2502
2503void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2504{
2505    struct ocfs2_lock_res *lockres;
2506    struct ocfs2_orphan_scan_lvb *lvb;
2507
2508    if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2509        lockres = &osb->osb_orphan_scan.os_lockres;
2510        lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2511        lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2512        lvb->lvb_os_seqno = cpu_to_be32(seqno);
2513        ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2514    }
2515}
2516
2517int ocfs2_super_lock(struct ocfs2_super *osb,
2518             int ex)
2519{
2520    int status = 0;
2521    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2522    struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2523
2524    if (ocfs2_is_hard_readonly(osb))
2525        return -EROFS;
2526
2527    if (ocfs2_mount_local(osb))
2528        goto bail;
2529
2530    status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2531    if (status < 0) {
2532        mlog_errno(status);
2533        goto bail;
2534    }
2535
2536    /* The super block lock path is really in the best position to
2537     * know when resources covered by the lock need to be
2538     * refreshed, so we do it here. Of course, making sense of
2539     * everything is up to the caller :) */
2540    status = ocfs2_should_refresh_lock_res(lockres);
2541    if (status < 0) {
2542        mlog_errno(status);
2543        goto bail;
2544    }
2545    if (status) {
2546        status = ocfs2_refresh_slot_info(osb);
2547
2548        ocfs2_complete_lock_res_refresh(lockres, status);
2549
2550        if (status < 0)
2551            mlog_errno(status);
2552        ocfs2_track_lock_refresh(lockres);
2553    }
2554bail:
2555    return status;
2556}
2557
2558void ocfs2_super_unlock(struct ocfs2_super *osb,
2559            int ex)
2560{
2561    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2562    struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2563
2564    if (!ocfs2_mount_local(osb))
2565        ocfs2_cluster_unlock(osb, lockres, level);
2566}
2567
2568int ocfs2_rename_lock(struct ocfs2_super *osb)
2569{
2570    int status;
2571    struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2572
2573    if (ocfs2_is_hard_readonly(osb))
2574        return -EROFS;
2575
2576    if (ocfs2_mount_local(osb))
2577        return 0;
2578
2579    status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2580    if (status < 0)
2581        mlog_errno(status);
2582
2583    return status;
2584}
2585
2586void ocfs2_rename_unlock(struct ocfs2_super *osb)
2587{
2588    struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2589
2590    if (!ocfs2_mount_local(osb))
2591        ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2592}
2593
2594int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2595{
2596    int status;
2597    struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2598
2599    if (ocfs2_is_hard_readonly(osb))
2600        return -EROFS;
2601
2602    if (ocfs2_mount_local(osb))
2603        return 0;
2604
2605    status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2606                    0, 0);
2607    if (status < 0)
2608        mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2609
2610    return status;
2611}
2612
2613void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2614{
2615    struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2616
2617    if (!ocfs2_mount_local(osb))
2618        ocfs2_cluster_unlock(osb, lockres,
2619                     ex ? LKM_EXMODE : LKM_PRMODE);
2620}
2621
2622int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2623{
2624    int ret;
2625    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2626    struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2627    struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2628
2629    BUG_ON(!dl);
2630
2631    if (ocfs2_is_hard_readonly(osb))
2632        return -EROFS;
2633
2634    if (ocfs2_mount_local(osb))
2635        return 0;
2636
2637    ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2638    if (ret < 0)
2639        mlog_errno(ret);
2640
2641    return ret;
2642}
2643
2644void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2645{
2646    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2647    struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2648    struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2649
2650    if (!ocfs2_mount_local(osb))
2651        ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2652}
2653
2654/* Reference counting of the dlm debug structure. We want this because
2655 * open references on the debug inodes can live on after a mount, so
2656 * we can't rely on the ocfs2_super to always exist. */
2657static void ocfs2_dlm_debug_free(struct kref *kref)
2658{
2659    struct ocfs2_dlm_debug *dlm_debug;
2660
2661    dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2662
2663    kfree(dlm_debug);
2664}
2665
2666void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2667{
2668    if (dlm_debug)
2669        kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2670}
2671
2672static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2673{
2674    kref_get(&debug->d_refcnt);
2675}
2676
2677struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2678{
2679    struct ocfs2_dlm_debug *dlm_debug;
2680
2681    dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2682    if (!dlm_debug) {
2683        mlog_errno(-ENOMEM);
2684        goto out;
2685    }
2686
2687    kref_init(&dlm_debug->d_refcnt);
2688    INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2689    dlm_debug->d_locking_state = NULL;
2690out:
2691    return dlm_debug;
2692}
2693
2694/* Access to this is arbitrated for us via seq_file->sem. */
2695struct ocfs2_dlm_seq_priv {
2696    struct ocfs2_dlm_debug *p_dlm_debug;
2697    struct ocfs2_lock_res p_iter_res;
2698    struct ocfs2_lock_res p_tmp_res;
2699};
2700
2701static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2702                         struct ocfs2_dlm_seq_priv *priv)
2703{
2704    struct ocfs2_lock_res *iter, *ret = NULL;
2705    struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2706
2707    assert_spin_locked(&ocfs2_dlm_tracking_lock);
2708
2709    list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2710        /* discover the head of the list */
2711        if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2712            mlog(0, "End of list found, %p\n", ret);
2713            break;
2714        }
2715
2716        /* We track our "dummy" iteration lockres' by a NULL
2717         * l_ops field. */
2718        if (iter->l_ops != NULL) {
2719            ret = iter;
2720            break;
2721        }
2722    }
2723
2724    return ret;
2725}
2726
2727static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2728{
2729    struct ocfs2_dlm_seq_priv *priv = m->private;
2730    struct ocfs2_lock_res *iter;
2731
2732    spin_lock(&ocfs2_dlm_tracking_lock);
2733    iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2734    if (iter) {
2735        /* Since lockres' have the lifetime of their container
2736         * (which can be inodes, ocfs2_supers, etc) we want to
2737         * copy this out to a temporary lockres while still
2738         * under the spinlock. Obviously after this we can't
2739         * trust any pointers on the copy returned, but that's
2740         * ok as the information we want isn't typically held
2741         * in them. */
2742        priv->p_tmp_res = *iter;
2743        iter = &priv->p_tmp_res;
2744    }
2745    spin_unlock(&ocfs2_dlm_tracking_lock);
2746
2747    return iter;
2748}
2749
2750static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2751{
2752}
2753
2754static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2755{
2756    struct ocfs2_dlm_seq_priv *priv = m->private;
2757    struct ocfs2_lock_res *iter = v;
2758    struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2759
2760    spin_lock(&ocfs2_dlm_tracking_lock);
2761    iter = ocfs2_dlm_next_res(iter, priv);
2762    list_del_init(&dummy->l_debug_list);
2763    if (iter) {
2764        list_add(&dummy->l_debug_list, &iter->l_debug_list);
2765        priv->p_tmp_res = *iter;
2766        iter = &priv->p_tmp_res;
2767    }
2768    spin_unlock(&ocfs2_dlm_tracking_lock);
2769
2770    return iter;
2771}
2772
2773/*
2774 * Version is used by debugfs.ocfs2 to determine the format being used
2775 *
2776 * New in version 2
2777 * - Lock stats printed
2778 * New in version 3
2779 * - Max time in lock stats is in usecs (instead of nsecs)
2780 */
2781#define OCFS2_DLM_DEBUG_STR_VERSION 3
2782static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2783{
2784    int i;
2785    char *lvb;
2786    struct ocfs2_lock_res *lockres = v;
2787
2788    if (!lockres)
2789        return -EINVAL;
2790
2791    seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2792
2793    if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2794        seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2795               lockres->l_name,
2796               (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2797    else
2798        seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2799
2800    seq_printf(m, "%d\t"
2801           "0x%lx\t"
2802           "0x%x\t"
2803           "0x%x\t"
2804           "%u\t"
2805           "%u\t"
2806           "%d\t"
2807           "%d\t",
2808           lockres->l_level,
2809           lockres->l_flags,
2810           lockres->l_action,
2811           lockres->l_unlock_action,
2812           lockres->l_ro_holders,
2813           lockres->l_ex_holders,
2814           lockres->l_requested,
2815           lockres->l_blocking);
2816
2817    /* Dump the raw LVB */
2818    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2819    for(i = 0; i < DLM_LVB_LEN; i++)
2820        seq_printf(m, "0x%x\t", lvb[i]);
2821
2822#ifdef CONFIG_OCFS2_FS_STATS
2823# define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets)
2824# define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets)
2825# define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail)
2826# define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail)
2827# define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total)
2828# define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total)
2829# define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max)
2830# define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max)
2831# define lock_refresh(_l) ((_l)->l_lock_refresh)
2832#else
2833# define lock_num_prmode(_l) (0)
2834# define lock_num_exmode(_l) (0)
2835# define lock_num_prmode_failed(_l) (0)
2836# define lock_num_exmode_failed(_l) (0)
2837# define lock_total_prmode(_l) (0ULL)
2838# define lock_total_exmode(_l) (0ULL)
2839# define lock_max_prmode(_l) (0)
2840# define lock_max_exmode(_l) (0)
2841# define lock_refresh(_l) (0)
2842#endif
2843    /* The following seq_print was added in version 2 of this output */
2844    seq_printf(m, "%u\t"
2845           "%u\t"
2846           "%u\t"
2847           "%u\t"
2848           "%llu\t"
2849           "%llu\t"
2850           "%u\t"
2851           "%u\t"
2852           "%u\t",
2853           lock_num_prmode(lockres),
2854           lock_num_exmode(lockres),
2855           lock_num_prmode_failed(lockres),
2856           lock_num_exmode_failed(lockres),
2857           lock_total_prmode(lockres),
2858           lock_total_exmode(lockres),
2859           lock_max_prmode(lockres),
2860           lock_max_exmode(lockres),
2861           lock_refresh(lockres));
2862
2863    /* End the line */
2864    seq_printf(m, "\n");
2865    return 0;
2866}
2867
2868static const struct seq_operations ocfs2_dlm_seq_ops = {
2869    .start = ocfs2_dlm_seq_start,
2870    .stop = ocfs2_dlm_seq_stop,
2871    .next = ocfs2_dlm_seq_next,
2872    .show = ocfs2_dlm_seq_show,
2873};
2874
2875static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2876{
2877    struct seq_file *seq = file->private_data;
2878    struct ocfs2_dlm_seq_priv *priv = seq->private;
2879    struct ocfs2_lock_res *res = &priv->p_iter_res;
2880
2881    ocfs2_remove_lockres_tracking(res);
2882    ocfs2_put_dlm_debug(priv->p_dlm_debug);
2883    return seq_release_private(inode, file);
2884}
2885
2886static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2887{
2888    int ret;
2889    struct ocfs2_dlm_seq_priv *priv;
2890    struct seq_file *seq;
2891    struct ocfs2_super *osb;
2892
2893    priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2894    if (!priv) {
2895        ret = -ENOMEM;
2896        mlog_errno(ret);
2897        goto out;
2898    }
2899    osb = inode->i_private;
2900    ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2901    priv->p_dlm_debug = osb->osb_dlm_debug;
2902    INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2903
2904    ret = seq_open(file, &ocfs2_dlm_seq_ops);
2905    if (ret) {
2906        kfree(priv);
2907        mlog_errno(ret);
2908        goto out;
2909    }
2910
2911    seq = file->private_data;
2912    seq->private = priv;
2913
2914    ocfs2_add_lockres_tracking(&priv->p_iter_res,
2915                   priv->p_dlm_debug);
2916
2917out:
2918    return ret;
2919}
2920
2921static const struct file_operations ocfs2_dlm_debug_fops = {
2922    .open = ocfs2_dlm_debug_open,
2923    .release = ocfs2_dlm_debug_release,
2924    .read = seq_read,
2925    .llseek = seq_lseek,
2926};
2927
2928static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2929{
2930    int ret = 0;
2931    struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2932
2933    dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2934                             S_IFREG|S_IRUSR,
2935                             osb->osb_debug_root,
2936                             osb,
2937                             &ocfs2_dlm_debug_fops);
2938    if (!dlm_debug->d_locking_state) {
2939        ret = -EINVAL;
2940        mlog(ML_ERROR,
2941             "Unable to create locking state debugfs file.\n");
2942        goto out;
2943    }
2944
2945    ocfs2_get_dlm_debug(dlm_debug);
2946out:
2947    return ret;
2948}
2949
2950static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2951{
2952    struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2953
2954    if (dlm_debug) {
2955        debugfs_remove(dlm_debug->d_locking_state);
2956        ocfs2_put_dlm_debug(dlm_debug);
2957    }
2958}
2959
2960int ocfs2_dlm_init(struct ocfs2_super *osb)
2961{
2962    int status = 0;
2963    struct ocfs2_cluster_connection *conn = NULL;
2964
2965    if (ocfs2_mount_local(osb)) {
2966        osb->node_num = 0;
2967        goto local;
2968    }
2969
2970    status = ocfs2_dlm_init_debug(osb);
2971    if (status < 0) {
2972        mlog_errno(status);
2973        goto bail;
2974    }
2975
2976    /* launch downconvert thread */
2977    osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2978    if (IS_ERR(osb->dc_task)) {
2979        status = PTR_ERR(osb->dc_task);
2980        osb->dc_task = NULL;
2981        mlog_errno(status);
2982        goto bail;
2983    }
2984
2985    /* for now, uuid == domain */
2986    status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2987                       osb->uuid_str,
2988                       strlen(osb->uuid_str),
2989                       &lproto, ocfs2_do_node_down, osb,
2990                       &conn);
2991    if (status) {
2992        mlog_errno(status);
2993        goto bail;
2994    }
2995
2996    status = ocfs2_cluster_this_node(&osb->node_num);
2997    if (status < 0) {
2998        mlog_errno(status);
2999        mlog(ML_ERROR,
3000             "could not find this host's node number\n");
3001        ocfs2_cluster_disconnect(conn, 0);
3002        goto bail;
3003    }
3004
3005local:
3006    ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3007    ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3008    ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
3009    ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3010
3011    osb->cconn = conn;
3012
3013    status = 0;
3014bail:
3015    if (status < 0) {
3016        ocfs2_dlm_shutdown_debug(osb);
3017        if (osb->dc_task)
3018            kthread_stop(osb->dc_task);
3019    }
3020
3021    return status;
3022}
3023
3024void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3025            int hangup_pending)
3026{
3027    ocfs2_drop_osb_locks(osb);
3028
3029    /*
3030     * Now that we have dropped all locks and ocfs2_dismount_volume()
3031     * has disabled recovery, the DLM won't be talking to us. It's
3032     * safe to tear things down before disconnecting the cluster.
3033     */
3034
3035    if (osb->dc_task) {
3036        kthread_stop(osb->dc_task);
3037        osb->dc_task = NULL;
3038    }
3039
3040    ocfs2_lock_res_free(&osb->osb_super_lockres);
3041    ocfs2_lock_res_free(&osb->osb_rename_lockres);
3042    ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3043    ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3044
3045    ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3046    osb->cconn = NULL;
3047
3048    ocfs2_dlm_shutdown_debug(osb);
3049}
3050
3051static int ocfs2_drop_lock(struct ocfs2_super *osb,
3052               struct ocfs2_lock_res *lockres)
3053{
3054    int ret;
3055    unsigned long flags;
3056    u32 lkm_flags = 0;
3057
3058    /* We didn't get anywhere near actually using this lockres. */
3059    if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3060        goto out;
3061
3062    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3063        lkm_flags |= DLM_LKF_VALBLK;
3064
3065    spin_lock_irqsave(&lockres->l_lock, flags);
3066
3067    mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3068            "lockres %s, flags 0x%lx\n",
3069            lockres->l_name, lockres->l_flags);
3070
3071    while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3072        mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3073             "%u, unlock_action = %u\n",
3074             lockres->l_name, lockres->l_flags, lockres->l_action,
3075             lockres->l_unlock_action);
3076
3077        spin_unlock_irqrestore(&lockres->l_lock, flags);
3078
3079        /* XXX: Today we just wait on any busy
3080         * locks... Perhaps we need to cancel converts in the
3081         * future? */
3082        ocfs2_wait_on_busy_lock(lockres);
3083
3084        spin_lock_irqsave(&lockres->l_lock, flags);
3085    }
3086
3087    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3088        if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3089            lockres->l_level == DLM_LOCK_EX &&
3090            !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3091            lockres->l_ops->set_lvb(lockres);
3092    }
3093
3094    if (lockres->l_flags & OCFS2_LOCK_BUSY)
3095        mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3096             lockres->l_name);
3097    if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3098        mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3099
3100    if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3101        spin_unlock_irqrestore(&lockres->l_lock, flags);
3102        goto out;
3103    }
3104
3105    lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3106
3107    /* make sure we never get here while waiting for an ast to
3108     * fire. */
3109    BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3110
3111    /* is this necessary? */
3112    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3113    lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3114    spin_unlock_irqrestore(&lockres->l_lock, flags);
3115
3116    mlog(0, "lock %s\n", lockres->l_name);
3117
3118    ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3119    if (ret) {
3120        ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3121        mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3122        ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3123        BUG();
3124    }
3125    mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3126         lockres->l_name);
3127
3128    ocfs2_wait_on_busy_lock(lockres);
3129out:
3130    return 0;
3131}
3132
3133/* Mark the lockres as being dropped. It will no longer be
3134 * queued if blocking, but we still may have to wait on it
3135 * being dequeued from the downconvert thread before we can consider
3136 * it safe to drop.
3137 *
3138 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3139void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
3140{
3141    int status;
3142    struct ocfs2_mask_waiter mw;
3143    unsigned long flags;
3144
3145    ocfs2_init_mask_waiter(&mw);
3146
3147    spin_lock_irqsave(&lockres->l_lock, flags);
3148    lockres->l_flags |= OCFS2_LOCK_FREEING;
3149    while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3150        lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3151        spin_unlock_irqrestore(&lockres->l_lock, flags);
3152
3153        mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3154
3155        status = ocfs2_wait_for_mask(&mw);
3156        if (status)
3157            mlog_errno(status);
3158
3159        spin_lock_irqsave(&lockres->l_lock, flags);
3160    }
3161    spin_unlock_irqrestore(&lockres->l_lock, flags);
3162}
3163
3164void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3165                   struct ocfs2_lock_res *lockres)
3166{
3167    int ret;
3168
3169    ocfs2_mark_lockres_freeing(lockres);
3170    ret = ocfs2_drop_lock(osb, lockres);
3171    if (ret)
3172        mlog_errno(ret);
3173}
3174
3175static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3176{
3177    ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3178    ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3179    ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3180    ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3181}
3182
3183int ocfs2_drop_inode_locks(struct inode *inode)
3184{
3185    int status, err;
3186
3187    /* No need to call ocfs2_mark_lockres_freeing here -
3188     * ocfs2_clear_inode has done it for us. */
3189
3190    err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3191                  &OCFS2_I(inode)->ip_open_lockres);
3192    if (err < 0)
3193        mlog_errno(err);
3194
3195    status = err;
3196
3197    err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3198                  &OCFS2_I(inode)->ip_inode_lockres);
3199    if (err < 0)
3200        mlog_errno(err);
3201    if (err < 0 && !status)
3202        status = err;
3203
3204    err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3205                  &OCFS2_I(inode)->ip_rw_lockres);
3206    if (err < 0)
3207        mlog_errno(err);
3208    if (err < 0 && !status)
3209        status = err;
3210
3211    return status;
3212}
3213
3214static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3215                          int new_level)
3216{
3217    assert_spin_locked(&lockres->l_lock);
3218
3219    BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3220
3221    if (lockres->l_level <= new_level) {
3222        mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3223             "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3224             "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3225             new_level, list_empty(&lockres->l_blocked_list),
3226             list_empty(&lockres->l_mask_waiters), lockres->l_type,
3227             lockres->l_flags, lockres->l_ro_holders,
3228             lockres->l_ex_holders, lockres->l_action,
3229             lockres->l_unlock_action, lockres->l_requested,
3230             lockres->l_blocking, lockres->l_pending_gen);
3231        BUG();
3232    }
3233
3234    mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3235         lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3236
3237    lockres->l_action = OCFS2_AST_DOWNCONVERT;
3238    lockres->l_requested = new_level;
3239    lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3240    return lockres_set_pending(lockres);
3241}
3242
3243static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3244                  struct ocfs2_lock_res *lockres,
3245                  int new_level,
3246                  int lvb,
3247                  unsigned int generation)
3248{
3249    int ret;
3250    u32 dlm_flags = DLM_LKF_CONVERT;
3251
3252    mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3253         lockres->l_level, new_level);
3254
3255    if (lvb)
3256        dlm_flags |= DLM_LKF_VALBLK;
3257
3258    ret = ocfs2_dlm_lock(osb->cconn,
3259                 new_level,
3260                 &lockres->l_lksb,
3261                 dlm_flags,
3262                 lockres->l_name,
3263                 OCFS2_LOCK_ID_MAX_LEN - 1);
3264    lockres_clear_pending(lockres, generation, osb);
3265    if (ret) {
3266        ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3267        ocfs2_recover_from_dlm_error(lockres, 1);
3268        goto bail;
3269    }
3270
3271    ret = 0;
3272bail:
3273    return ret;
3274}
3275
3276/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3277static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3278                        struct ocfs2_lock_res *lockres)
3279{
3280    assert_spin_locked(&lockres->l_lock);
3281
3282    if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3283        /* If we're already trying to cancel a lock conversion
3284         * then just drop the spinlock and allow the caller to
3285         * requeue this lock. */
3286        mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3287        return 0;
3288    }
3289
3290    /* were we in a convert when we got the bast fire? */
3291    BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3292           lockres->l_action != OCFS2_AST_DOWNCONVERT);
3293    /* set things up for the unlockast to know to just
3294     * clear out the ast_action and unset busy, etc. */
3295    lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3296
3297    mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3298            "lock %s, invalid flags: 0x%lx\n",
3299            lockres->l_name, lockres->l_flags);
3300
3301    mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3302
3303    return 1;
3304}
3305
3306static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3307                struct ocfs2_lock_res *lockres)
3308{
3309    int ret;
3310
3311    ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3312                   DLM_LKF_CANCEL);
3313    if (ret) {
3314        ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3315        ocfs2_recover_from_dlm_error(lockres, 0);
3316    }
3317
3318    mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3319
3320    return ret;
3321}
3322
3323static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3324                  struct ocfs2_lock_res *lockres,
3325                  struct ocfs2_unblock_ctl *ctl)
3326{
3327    unsigned long flags;
3328    int blocking;
3329    int new_level;
3330    int level;
3331    int ret = 0;
3332    int set_lvb = 0;
3333    unsigned int gen;
3334
3335    spin_lock_irqsave(&lockres->l_lock, flags);
3336
3337recheck:
3338    /*
3339     * Is it still blocking? If not, we have no more work to do.
3340     */
3341    if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3342        BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3343        spin_unlock_irqrestore(&lockres->l_lock, flags);
3344        ret = 0;
3345        goto leave;
3346    }
3347
3348    if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3349        /* XXX
3350         * This is a *big* race. The OCFS2_LOCK_PENDING flag
3351         * exists entirely for one reason - another thread has set
3352         * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3353         *
3354         * If we do ocfs2_cancel_convert() before the other thread
3355         * calls dlm_lock(), our cancel will do nothing. We will
3356         * get no ast, and we will have no way of knowing the
3357         * cancel failed. Meanwhile, the other thread will call
3358         * into dlm_lock() and wait...forever.
3359         *
3360         * Why forever? Because another node has asked for the
3361         * lock first; that's why we're here in unblock_lock().
3362         *
3363         * The solution is OCFS2_LOCK_PENDING. When PENDING is
3364         * set, we just requeue the unblock. Only when the other
3365         * thread has called dlm_lock() and cleared PENDING will
3366         * we then cancel their request.
3367         *
3368         * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3369         * at the same time they set OCFS2_DLM_BUSY. They must
3370         * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3371         */
3372        if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3373            mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3374                 lockres->l_name);
3375            goto leave_requeue;
3376        }
3377
3378        ctl->requeue = 1;
3379        ret = ocfs2_prepare_cancel_convert(osb, lockres);
3380        spin_unlock_irqrestore(&lockres->l_lock, flags);
3381        if (ret) {
3382            ret = ocfs2_cancel_convert(osb, lockres);
3383            if (ret < 0)
3384                mlog_errno(ret);
3385        }
3386        goto leave;
3387    }
3388
3389    /*
3390     * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3391     * set when the ast is received for an upconvert just before the
3392     * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3393     * on the heels of the ast, we want to delay the downconvert just
3394     * enough to allow the up requestor to do its task. Because this
3395     * lock is in the blocked queue, the lock will be downconverted
3396     * as soon as the requestor is done with the lock.
3397     */
3398    if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3399        goto leave_requeue;
3400
3401    /*
3402     * How can we block and yet be at NL? We were trying to upconvert
3403     * from NL and got canceled. The code comes back here, and now
3404     * we notice and clear BLOCKING.
3405     */
3406    if (lockres->l_level == DLM_LOCK_NL) {
3407        BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3408        mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3409        lockres->l_blocking = DLM_LOCK_NL;
3410        lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3411        spin_unlock_irqrestore(&lockres->l_lock, flags);
3412        goto leave;
3413    }
3414
3415    /* if we're blocking an exclusive and we have *any* holders,
3416     * then requeue. */
3417    if ((lockres->l_blocking == DLM_LOCK_EX)
3418        && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3419        mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3420             lockres->l_name, lockres->l_ex_holders,
3421             lockres->l_ro_holders);
3422        goto leave_requeue;
3423    }
3424
3425    /* If it's a PR we're blocking, then only
3426     * requeue if we've got any EX holders */
3427    if (lockres->l_blocking == DLM_LOCK_PR &&
3428        lockres->l_ex_holders) {
3429        mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3430             lockres->l_name, lockres->l_ex_holders);
3431        goto leave_requeue;
3432    }
3433
3434    /*
3435     * Can we get a lock in this state if the holder counts are
3436     * zero? The meta data unblock code used to check this.
3437     */
3438    if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3439        && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3440        mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3441             lockres->l_name);
3442        goto leave_requeue;
3443    }
3444
3445    new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3446
3447    if (lockres->l_ops->check_downconvert
3448        && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3449        mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3450             lockres->l_name);
3451        goto leave_requeue;
3452    }
3453
3454    /* If we get here, then we know that there are no more
3455     * incompatible holders (and anyone asking for an incompatible
3456     * lock is blocked). We can now downconvert the lock */
3457    if (!lockres->l_ops->downconvert_worker)
3458        goto downconvert;
3459
3460    /* Some lockres types want to do a bit of work before
3461     * downconverting a lock. Allow that here. The worker function
3462     * may sleep, so we save off a copy of what we're blocking as
3463     * it may change while we're not holding the spin lock. */
3464    blocking = lockres->l_blocking;
3465    level = lockres->l_level;
3466    spin_unlock_irqrestore(&lockres->l_lock, flags);
3467
3468    ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3469
3470    if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3471        mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3472             lockres->l_name);
3473        goto leave;
3474    }
3475
3476    spin_lock_irqsave(&lockres->l_lock, flags);
3477    if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3478        /* If this changed underneath us, then we can't drop
3479         * it just yet. */
3480        mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3481             "Recheck\n", lockres->l_name, blocking,
3482             lockres->l_blocking, level, lockres->l_level);
3483        goto recheck;
3484    }
3485
3486downconvert:
3487    ctl->requeue = 0;
3488
3489    if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3490        if (lockres->l_level == DLM_LOCK_EX)
3491            set_lvb = 1;
3492
3493        /*
3494         * We only set the lvb if the lock has been fully
3495         * refreshed - otherwise we risk setting stale
3496         * data. Otherwise, there's no need to actually clear
3497         * out the lvb here as it's value is still valid.
3498         */
3499        if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3500            lockres->l_ops->set_lvb(lockres);
3501    }
3502
3503    gen = ocfs2_prepare_downconvert(lockres, new_level);
3504    spin_unlock_irqrestore(&lockres->l_lock, flags);
3505    ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3506                     gen);
3507
3508leave:
3509    if (ret)
3510        mlog_errno(ret);
3511    return ret;
3512
3513leave_requeue:
3514    spin_unlock_irqrestore(&lockres->l_lock, flags);
3515    ctl->requeue = 1;
3516
3517    return 0;
3518}
3519
3520static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3521                     int blocking)
3522{
3523    struct inode *inode;
3524    struct address_space *mapping;
3525    struct ocfs2_inode_info *oi;
3526
3527           inode = ocfs2_lock_res_inode(lockres);
3528    mapping = inode->i_mapping;
3529
3530    if (S_ISDIR(inode->i_mode)) {
3531        oi = OCFS2_I(inode);
3532        oi->ip_dir_lock_gen++;
3533        mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3534        goto out;
3535    }
3536
3537    if (!S_ISREG(inode->i_mode))
3538        goto out;
3539
3540    /*
3541     * We need this before the filemap_fdatawrite() so that it can
3542     * transfer the dirty bit from the PTE to the
3543     * page. Unfortunately this means that even for EX->PR
3544     * downconverts, we'll lose our mappings and have to build
3545     * them up again.
3546     */
3547    unmap_mapping_range(mapping, 0, 0, 0);
3548
3549    if (filemap_fdatawrite(mapping)) {
3550        mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3551             (unsigned long long)OCFS2_I(inode)->ip_blkno);
3552    }
3553    sync_mapping_buffers(mapping);
3554    if (blocking == DLM_LOCK_EX) {
3555        truncate_inode_pages(mapping, 0);
3556    } else {
3557        /* We only need to wait on the I/O if we're not also
3558         * truncating pages because truncate_inode_pages waits
3559         * for us above. We don't truncate pages if we're
3560         * blocking anything < EXMODE because we want to keep
3561         * them around in that case. */
3562        filemap_fdatawait(mapping);
3563    }
3564
3565out:
3566    return UNBLOCK_CONTINUE;
3567}
3568
3569static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3570                 struct ocfs2_lock_res *lockres,
3571                 int new_level)
3572{
3573    int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3574
3575    BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3576    BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3577
3578    if (checkpointed)
3579        return 1;
3580
3581    ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3582    return 0;
3583}
3584
3585static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3586                    int new_level)
3587{
3588    struct inode *inode = ocfs2_lock_res_inode(lockres);
3589
3590    return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3591}
3592
3593static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3594{
3595    struct inode *inode = ocfs2_lock_res_inode(lockres);
3596
3597    __ocfs2_stuff_meta_lvb(inode);
3598}
3599
3600/*
3601 * Does the final reference drop on our dentry lock. Right now this
3602 * happens in the downconvert thread, but we could choose to simplify the
3603 * dlmglue API and push these off to the ocfs2_wq in the future.
3604 */
3605static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3606                     struct ocfs2_lock_res *lockres)
3607{
3608    struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3609    ocfs2_dentry_lock_put(osb, dl);
3610}
3611
3612/*
3613 * d_delete() matching dentries before the lock downconvert.
3614 *
3615 * At this point, any process waiting to destroy the
3616 * dentry_lock due to last ref count is stopped by the
3617 * OCFS2_LOCK_QUEUED flag.
3618 *
3619 * We have two potential problems
3620 *
3621 * 1) If we do the last reference drop on our dentry_lock (via dput)
3622 * we'll wind up in ocfs2_release_dentry_lock(), waiting on
3623 * the downconvert to finish. Instead we take an elevated
3624 * reference and push the drop until after we've completed our
3625 * unblock processing.
3626 *
3627 * 2) There might be another process with a final reference,
3628 * waiting on us to finish processing. If this is the case, we
3629 * detect it and exit out - there's no more dentries anyway.
3630 */
3631static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3632                       int blocking)
3633{
3634    struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3635    struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3636    struct dentry *dentry;
3637    unsigned long flags;
3638    int extra_ref = 0;
3639
3640    /*
3641     * This node is blocking another node from getting a read
3642     * lock. This happens when we've renamed within a
3643     * directory. We've forced the other nodes to d_delete(), but
3644     * we never actually dropped our lock because it's still
3645     * valid. The downconvert code will retain a PR for this node,
3646     * so there's no further work to do.
3647     */
3648    if (blocking == DLM_LOCK_PR)
3649        return UNBLOCK_CONTINUE;
3650
3651    /*
3652     * Mark this inode as potentially orphaned. The code in
3653     * ocfs2_delete_inode() will figure out whether it actually
3654     * needs to be freed or not.
3655     */
3656    spin_lock(&oi->ip_lock);
3657    oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3658    spin_unlock(&oi->ip_lock);
3659
3660    /*
3661     * Yuck. We need to make sure however that the check of
3662     * OCFS2_LOCK_FREEING and the extra reference are atomic with
3663     * respect to a reference decrement or the setting of that
3664     * flag.
3665     */
3666    spin_lock_irqsave(&lockres->l_lock, flags);
3667    spin_lock(&dentry_attach_lock);
3668    if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3669        && dl->dl_count) {
3670        dl->dl_count++;
3671        extra_ref = 1;
3672    }
3673    spin_unlock(&dentry_attach_lock);
3674    spin_unlock_irqrestore(&lockres->l_lock, flags);
3675
3676    mlog(0, "extra_ref = %d\n", extra_ref);
3677
3678    /*
3679     * We have a process waiting on us in ocfs2_dentry_iput(),
3680     * which means we can't have any more outstanding
3681     * aliases. There's no need to do any more work.
3682     */
3683    if (!extra_ref)
3684        return UNBLOCK_CONTINUE;
3685
3686    spin_lock(&dentry_attach_lock);
3687    while (1) {
3688        dentry = ocfs2_find_local_alias(dl->dl_inode,
3689                        dl->dl_parent_blkno, 1);
3690        if (!dentry)
3691            break;
3692        spin_unlock(&dentry_attach_lock);
3693
3694        mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3695             dentry->d_name.name);
3696
3697        /*
3698         * The following dcache calls may do an
3699         * iput(). Normally we don't want that from the
3700         * downconverting thread, but in this case it's ok
3701         * because the requesting node already has an
3702         * exclusive lock on the inode, so it can't be queued
3703         * for a downconvert.
3704         */
3705        d_delete(dentry);
3706        dput(dentry);
3707
3708        spin_lock(&dentry_attach_lock);
3709    }
3710    spin_unlock(&dentry_attach_lock);
3711
3712    /*
3713     * If we are the last holder of this dentry lock, there is no
3714     * reason to downconvert so skip straight to the unlock.
3715     */
3716    if (dl->dl_count == 1)
3717        return UNBLOCK_STOP_POST;
3718
3719    return UNBLOCK_CONTINUE_POST;
3720}
3721
3722static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
3723                        int new_level)
3724{
3725    struct ocfs2_refcount_tree *tree =
3726                ocfs2_lock_res_refcount_tree(lockres);
3727
3728    return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
3729}
3730
3731static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
3732                     int blocking)
3733{
3734    struct ocfs2_refcount_tree *tree =
3735                ocfs2_lock_res_refcount_tree(lockres);
3736
3737    ocfs2_metadata_cache_purge(&tree->rf_ci);
3738
3739    return UNBLOCK_CONTINUE;
3740}
3741
3742static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3743{
3744    struct ocfs2_qinfo_lvb *lvb;
3745    struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3746    struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3747                        oinfo->dqi_gi.dqi_type);
3748
3749    lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3750    lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3751    lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3752    lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3753    lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3754    lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3755    lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3756    lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3757}
3758
3759void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3760{
3761    struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3762    struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3763    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3764
3765    if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3766        ocfs2_cluster_unlock(osb, lockres, level);
3767}
3768
3769static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3770{
3771    struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3772                        oinfo->dqi_gi.dqi_type);
3773    struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3774    struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3775    struct buffer_head *bh = NULL;
3776    struct ocfs2_global_disk_dqinfo *gdinfo;
3777    int status = 0;
3778
3779    if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3780        lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3781        info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3782        info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3783        oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3784        oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3785        oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3786        oinfo->dqi_gi.dqi_free_entry =
3787                    be32_to_cpu(lvb->lvb_free_entry);
3788    } else {
3789        status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
3790                             oinfo->dqi_giblk, &bh);
3791        if (status) {
3792            mlog_errno(status);
3793            goto bail;
3794        }
3795        gdinfo = (struct ocfs2_global_disk_dqinfo *)
3796                    (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3797        info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3798        info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3799        oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3800        oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3801        oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3802        oinfo->dqi_gi.dqi_free_entry =
3803                    le32_to_cpu(gdinfo->dqi_free_entry);
3804        brelse(bh);
3805        ocfs2_track_lock_refresh(lockres);
3806    }
3807
3808bail:
3809    return status;
3810}
3811
3812/* Lock quota info, this function expects at least shared lock on the quota file
3813 * so that we can safely refresh quota info from disk. */
3814int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3815{
3816    struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3817    struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3818    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3819    int status = 0;
3820
3821    /* On RO devices, locking really isn't needed... */
3822    if (ocfs2_is_hard_readonly(osb)) {
3823        if (ex)
3824            status = -EROFS;
3825        goto bail;
3826    }
3827    if (ocfs2_mount_local(osb))
3828        goto bail;
3829
3830    status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3831    if (status < 0) {
3832        mlog_errno(status);
3833        goto bail;
3834    }
3835    if (!ocfs2_should_refresh_lock_res(lockres))
3836        goto bail;
3837    /* OK, we have the lock but we need to refresh the quota info */
3838    status = ocfs2_refresh_qinfo(oinfo);
3839    if (status)
3840        ocfs2_qinfo_unlock(oinfo, ex);
3841    ocfs2_complete_lock_res_refresh(lockres, status);
3842bail:
3843    return status;
3844}
3845
3846int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
3847{
3848    int status;
3849    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3850    struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3851    struct ocfs2_super *osb = lockres->l_priv;
3852
3853
3854    if (ocfs2_is_hard_readonly(osb))
3855        return -EROFS;
3856
3857    if (ocfs2_mount_local(osb))
3858        return 0;
3859
3860    status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3861    if (status < 0)
3862        mlog_errno(status);
3863
3864    return status;
3865}
3866
3867void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
3868{
3869    int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3870    struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3871    struct ocfs2_super *osb = lockres->l_priv;
3872
3873    if (!ocfs2_mount_local(osb))
3874        ocfs2_cluster_unlock(osb, lockres, level);
3875}
3876
3877static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3878                       struct ocfs2_lock_res *lockres)
3879{
3880    int status;
3881    struct ocfs2_unblock_ctl ctl = {0, 0,};
3882    unsigned long flags;
3883
3884    /* Our reference to the lockres in this function can be
3885     * considered valid until we remove the OCFS2_LOCK_QUEUED
3886     * flag. */
3887
3888    BUG_ON(!lockres);
3889    BUG_ON(!lockres->l_ops);
3890
3891    mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
3892
3893    /* Detect whether a lock has been marked as going away while
3894     * the downconvert thread was processing other things. A lock can
3895     * still be marked with OCFS2_LOCK_FREEING after this check,
3896     * but short circuiting here will still save us some
3897     * performance. */
3898    spin_lock_irqsave(&lockres->l_lock, flags);
3899    if (lockres->l_flags & OCFS2_LOCK_FREEING)
3900        goto unqueue;
3901    spin_unlock_irqrestore(&lockres->l_lock, flags);
3902
3903    status = ocfs2_unblock_lock(osb, lockres, &ctl);
3904    if (status < 0)
3905        mlog_errno(status);
3906
3907    spin_lock_irqsave(&lockres->l_lock, flags);
3908unqueue:
3909    if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3910        lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3911    } else
3912        ocfs2_schedule_blocked_lock(osb, lockres);
3913
3914    mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
3915         ctl.requeue ? "yes" : "no");
3916    spin_unlock_irqrestore(&lockres->l_lock, flags);
3917
3918    if (ctl.unblock_action != UNBLOCK_CONTINUE
3919        && lockres->l_ops->post_unlock)
3920        lockres->l_ops->post_unlock(osb, lockres);
3921}
3922
3923static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3924                    struct ocfs2_lock_res *lockres)
3925{
3926    assert_spin_locked(&lockres->l_lock);
3927
3928    if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3929        /* Do not schedule a lock for downconvert when it's on
3930         * the way to destruction - any nodes wanting access
3931         * to the resource will get it soon. */
3932        mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
3933             lockres->l_name, lockres->l_flags);
3934        return;
3935    }
3936
3937    lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3938
3939    spin_lock(&osb->dc_task_lock);
3940    if (list_empty(&lockres->l_blocked_list)) {
3941        list_add_tail(&lockres->l_blocked_list,
3942                  &osb->blocked_lock_list);
3943        osb->blocked_lock_count++;
3944    }
3945    spin_unlock(&osb->dc_task_lock);
3946}
3947
3948static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3949{
3950    unsigned long processed;
3951    struct ocfs2_lock_res *lockres;
3952
3953    spin_lock(&osb->dc_task_lock);
3954    /* grab this early so we know to try again if a state change and
3955     * wake happens part-way through our work */
3956    osb->dc_work_sequence = osb->dc_wake_sequence;
3957
3958    processed = osb->blocked_lock_count;
3959    while (processed) {
3960        BUG_ON(list_empty(&osb->blocked_lock_list));
3961
3962        lockres = list_entry(osb->blocked_lock_list.next,
3963                     struct ocfs2_lock_res, l_blocked_list);
3964        list_del_init(&lockres->l_blocked_list);
3965        osb->blocked_lock_count--;
3966        spin_unlock(&osb->dc_task_lock);
3967
3968        BUG_ON(!processed);
3969        processed--;
3970
3971        ocfs2_process_blocked_lock(osb, lockres);
3972
3973        spin_lock(&osb->dc_task_lock);
3974    }
3975    spin_unlock(&osb->dc_task_lock);
3976}
3977
3978static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3979{
3980    int empty = 0;
3981
3982    spin_lock(&osb->dc_task_lock);
3983    if (list_empty(&osb->blocked_lock_list))
3984        empty = 1;
3985
3986    spin_unlock(&osb->dc_task_lock);
3987    return empty;
3988}
3989
3990static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3991{
3992    int should_wake = 0;
3993
3994    spin_lock(&osb->dc_task_lock);
3995    if (osb->dc_work_sequence != osb->dc_wake_sequence)
3996        should_wake = 1;
3997    spin_unlock(&osb->dc_task_lock);
3998
3999    return should_wake;
4000}
4001
4002static int ocfs2_downconvert_thread(void *arg)
4003{
4004    int status = 0;
4005    struct ocfs2_super *osb = arg;
4006
4007    /* only quit once we've been asked to stop and there is no more
4008     * work available */
4009    while (!(kthread_should_stop() &&
4010        ocfs2_downconvert_thread_lists_empty(osb))) {
4011
4012        wait_event_interruptible(osb->dc_event,
4013                     ocfs2_downconvert_thread_should_wake(osb) ||
4014                     kthread_should_stop());
4015
4016        mlog(0, "downconvert_thread: awoken\n");
4017
4018        ocfs2_downconvert_thread_do_work(osb);
4019    }
4020
4021    osb->dc_task = NULL;
4022    return status;
4023}
4024
4025void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4026{
4027    spin_lock(&osb->dc_task_lock);
4028    /* make sure the voting thread gets a swipe at whatever changes
4029     * the caller may have made to the voting state */
4030    osb->dc_wake_sequence++;
4031    spin_unlock(&osb->dc_task_lock);
4032    wake_up(&osb->dc_event);
4033}
4034

Archive Download this file



interactive