ocfs2: Implement quota recovery

Implement functions for recovery after a crash. Functions just
read local quota file and sync info to global quota file.

Signed-off-by: Jan Kara <jack@suse.cz>
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 11a1178..c602420 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -45,6 +45,7 @@
 #include "slot_map.h"
 #include "super.h"
 #include "sysfile.h"
+#include "quota.h"
 
 #include "buffer_head_io.h"
 
@@ -52,7 +53,7 @@
 
 static int ocfs2_force_read_journal(struct inode *inode);
 static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num);
+			      int node_num, int slot_num);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
@@ -857,6 +858,7 @@
 	int			lri_slot;
 	struct ocfs2_dinode	*lri_la_dinode;
 	struct ocfs2_dinode	*lri_tl_dinode;
+	struct ocfs2_quota_recovery *lri_qrec;
 };
 
 /* Does the second half of the recovery process. By this point, the
@@ -877,6 +879,7 @@
 	struct ocfs2_super *osb = journal->j_osb;
 	struct ocfs2_dinode *la_dinode, *tl_dinode;
 	struct ocfs2_la_recovery_item *item, *n;
+	struct ocfs2_quota_recovery *qrec;
 	LIST_HEAD(tmp_la_list);
 
 	mlog_entry_void();
@@ -922,6 +925,16 @@
 		if (ret < 0)
 			mlog_errno(ret);
 
+		qrec = item->lri_qrec;
+		if (qrec) {
+			mlog(0, "Recovering quota files");
+			ret = ocfs2_finish_quota_recovery(osb, qrec,
+							  item->lri_slot);
+			if (ret < 0)
+				mlog_errno(ret);
+			/* Recovery info is already freed now */
+		}
+
 		kfree(item);
 	}
 
@@ -935,7 +948,8 @@
 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 					    int slot_num,
 					    struct ocfs2_dinode *la_dinode,
-					    struct ocfs2_dinode *tl_dinode)
+					    struct ocfs2_dinode *tl_dinode,
+					    struct ocfs2_quota_recovery *qrec)
 {
 	struct ocfs2_la_recovery_item *item;
 
@@ -950,6 +964,9 @@
 		if (tl_dinode)
 			kfree(tl_dinode);
 
+		if (qrec)
+			ocfs2_free_quota_recovery(qrec);
+
 		mlog_errno(-ENOMEM);
 		return;
 	}
@@ -958,6 +975,7 @@
 	item->lri_la_dinode = la_dinode;
 	item->lri_slot = slot_num;
 	item->lri_tl_dinode = tl_dinode;
+	item->lri_qrec = qrec;
 
 	spin_lock(&journal->j_lock);
 	list_add_tail(&item->lri_list, &journal->j_la_cleanups);
@@ -977,6 +995,7 @@
 		ocfs2_queue_recovery_completion(journal,
 						osb->slot_num,
 						osb->local_alloc_copy,
+						NULL,
 						NULL);
 		ocfs2_schedule_truncate_log_flush(osb, 0);
 
@@ -985,11 +1004,26 @@
 	}
 }
 
+void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
+{
+	if (osb->quota_rec) {
+		ocfs2_queue_recovery_completion(osb->journal,
+						osb->slot_num,
+						NULL,
+						NULL,
+						osb->quota_rec);
+		osb->quota_rec = NULL;
+	}
+}
+
 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num;
+	int status, node_num, slot_num;
 	struct ocfs2_super *osb = arg;
 	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	int *rm_quota = NULL;
+	int rm_quota_used = 0, i;
+	struct ocfs2_quota_recovery *qrec;
 
 	mlog_entry_void();
 
@@ -998,6 +1032,11 @@
 		goto bail;
 	}
 
+	rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
+	if (!rm_quota) {
+		status = -ENOMEM;
+		goto bail;
+	}
 restart:
 	status = ocfs2_super_lock(osb, 1);
 	if (status < 0) {
@@ -1011,8 +1050,28 @@
 		 * clear it until ocfs2_recover_node() has succeeded. */
 		node_num = rm->rm_entries[0];
 		spin_unlock(&osb->osb_lock);
+		mlog(0, "checking node %d\n", node_num);
+		slot_num = ocfs2_node_num_to_slot(osb, node_num);
+		if (slot_num == -ENOENT) {
+			status = 0;
+			mlog(0, "no slot for this node, so no recovery"
+			     "required.\n");
+			goto skip_recovery;
+		}
+		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
 
-		status = ocfs2_recover_node(osb, node_num);
+		/* It is a bit subtle with quota recovery. We cannot do it
+		 * immediately because we have to obtain cluster locks from
+		 * quota files and we also don't want to just skip it because
+		 * then quota usage would be out of sync until some node takes
+		 * the slot. So we remember which nodes need quota recovery
+		 * and when everything else is done, we recover quotas. */
+		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		if (i == rm_quota_used)
+			rm_quota[rm_quota_used++] = slot_num;
+
+		status = ocfs2_recover_node(osb, node_num, slot_num);
+skip_recovery:
 		if (!status) {
 			ocfs2_recovery_map_clear(osb, node_num);
 		} else {
@@ -1034,13 +1093,27 @@
 	if (status < 0)
 		mlog_errno(status);
 
+	/* Now it is right time to recover quotas... We have to do this under
+	 * superblock lock so that noone can start using the slot (and crash)
+	 * before we recover it */
+	for (i = 0; i < rm_quota_used; i++) {
+		qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
+		if (IS_ERR(qrec)) {
+			status = PTR_ERR(qrec);
+			mlog_errno(status);
+			continue;
+		}
+		ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
+						NULL, NULL, qrec);
+	}
+
 	ocfs2_super_unlock(osb, 1);
 
 	/* We always run recovery on our own orphan dir - the dead
 	 * node(s) may have disallowd a previos inode delete. Re-processing
 	 * is therefore required. */
 	ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
-					NULL);
+					NULL, NULL);
 
 bail:
 	mutex_lock(&osb->recovery_lock);
@@ -1055,6 +1128,9 @@
 
 	mutex_unlock(&osb->recovery_lock);
 
+	if (rm_quota)
+		kfree(rm_quota);
+
 	mlog_exit(status);
 	/* no one is callint kthread_stop() for us so the kthread() api
 	 * requires that we call do_exit().  And it isn't exported, but
@@ -1282,31 +1358,19 @@
  * far less concerning.
  */
 static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num)
+			      int node_num, int slot_num)
 {
 	int status = 0;
-	int slot_num;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
 
-	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
-		   node_num, osb->node_num);
-
-	mlog(0, "checking node %d\n", node_num);
+	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
+		   node_num, slot_num, osb->node_num);
 
 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
 	BUG_ON(osb->node_num == node_num);
 
-	slot_num = ocfs2_node_num_to_slot(osb, node_num);
-	if (slot_num == -ENOENT) {
-		status = 0;
-		mlog(0, "no slot for this node, so no recovery required.\n");
-		goto done;
-	}
-
-	mlog(0, "node %d was using slot %d\n", node_num, slot_num);
-
 	status = ocfs2_replay_journal(osb, node_num, slot_num);
 	if (status < 0) {
 		if (status == -EBUSY) {
@@ -1342,7 +1406,7 @@
 
 	/* This will kfree the memory pointed to by la_copy and tl_copy */
 	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy);
+					tl_copy, NULL);
 
 	status = 0;
 done: