diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 235 |
1 files changed, 188 insertions, 47 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index dd440bd438a9..a75ddbf9fe37 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3,6 +3,7 @@ #include <linux/wait.h> #include <linux/slab.h> #include <linux/sched.h> +#include <linux/smp_lock.h> #include "mds_client.h" #include "mon_client.h" @@ -37,6 +38,11 @@ * are no longer valid. */ +struct ceph_reconnect_state { + struct ceph_pagelist *pagelist; + bool flock; +}; + static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head); @@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref) kfree(req->r_path1); kfree(req->r_path2); put_request_session(req); - ceph_unreserve_caps(&req->r_caps_reservation); + ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); kfree(req); } @@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc, { req->r_tid = ++mdsc->last_tid; if (req->r_num_caps) - ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps); + ceph_reserve_caps(mdsc, &req->r_caps_reservation, + req->r_num_caps); dout("__register_request %p tid %lld\n", req, req->r_tid); ceph_mdsc_get_request(req); __insert_request(mdsc, req); @@ -704,6 +711,51 @@ static int __open_session(struct ceph_mds_client *mdsc, } /* + * open sessions for any export targets for the given mds + * + * called under mdsc->mutex + */ +static void __open_export_target_sessions(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_mds_info *mi; + struct ceph_mds_session *ts; + int i, mds = session->s_mds; + int target; + + if (mds >= mdsc->mdsmap->m_max_mds) + return; + mi = &mdsc->mdsmap->m_info[mds]; + dout("open_export_target_sessions for mds%d (%d targets)\n", + session->s_mds, mi->num_export_targets); + + for (i = 0; i < mi->num_export_targets; i++) { + target = mi->export_targets[i]; + ts = __ceph_lookup_mds_session(mdsc, target); + if (!ts) { + ts = register_session(mdsc, target); + if (IS_ERR(ts)) + return; + } + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + else + dout(" mds%d target mds%d %p is %s\n", session->s_mds, + i, ts, session_state_name(ts->s_state)); + ceph_put_mds_session(ts); + } +} + +void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + mutex_lock(&mdsc->mutex); + __open_export_target_sessions(mdsc, session); + mutex_unlock(&mdsc->mutex); +} + +/* * session caps */ @@ -764,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session, last_inode = NULL; } if (old_cap) { - ceph_put_cap(old_cap); + ceph_put_cap(session->s_mdsc, old_cap); old_cap = NULL; } @@ -793,7 +845,7 @@ out: if (last_inode) iput(last_inode); if (old_cap) - ceph_put_cap(old_cap); + ceph_put_cap(session->s_mdsc, old_cap); return ret; } @@ -1067,15 +1119,16 @@ static int trim_caps(struct ceph_mds_client *mdsc, * Called under s_mutex. */ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - int extra) + struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg, *partial = NULL; struct ceph_mds_cap_release *head; int err = -ENOMEM; + int extra = mdsc->client->mount_args->cap_release_safety; + int num; - if (extra < 0) - extra = mdsc->client->mount_args->cap_release_safety; + dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, + extra); spin_lock(&session->s_cap_lock); @@ -1084,9 +1137,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, struct ceph_msg, list_head); head = msg->front.iov_base; - extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num); + num = le32_to_cpu(head->num); + if (num) { + dout(" partial %p with (%d/%d)\n", msg, num, + (int)CEPH_CAPS_PER_RELEASE); + extra += CEPH_CAPS_PER_RELEASE - num; + partial = msg; + } } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { spin_unlock(&session->s_cap_lock); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, @@ -1103,19 +1161,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; } - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - if (head->num) { - dout(" queueing non-full %p (%d)\n", msg, - le32_to_cpu(head->num)); - list_move_tail(&msg->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= - CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num); - } + if (partial) { + head = partial->front.iov_base; + num = le32_to_cpu(head->num); + dout(" queueing partial %p with %d/%d\n", partial, num, + (int)CEPH_CAPS_PER_RELEASE); + list_move_tail(&partial->list_head, + &session->s_cap_releases_done); + session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; } err = 0; spin_unlock(&session->s_cap_lock); @@ -1250,6 +1303,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) return ERR_PTR(-ENOMEM); mutex_init(&req->r_fill_mutex); + req->r_mdsc = mdsc; req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); @@ -1580,6 +1634,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, req->r_mds = mds; req->r_attempts++; + if (req->r_inode) { + struct ceph_cap *cap = + ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); + + if (cap) + req->r_sent_on_mseq = cap->mseq; + else + req->r_sent_on_mseq = -1; + } dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); @@ -1914,21 +1977,40 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) result = le32_to_cpu(head->result); /* - * Tolerate 2 consecutive ESTALEs from the same mds. - * FIXME: we should be looking at the cap migrate_seq. + * Handle an ESTALE + * if we're not talking to the authority, send to them + * if the authority has changed while we weren't looking, + * send to new authority + * Otherwise we just have to return an ESTALE */ if (result == -ESTALE) { - req->r_direct_mode = USE_AUTH_MDS; - req->r_num_stale++; - if (req->r_num_stale <= 2) { + dout("got ESTALE on request %llu", req->r_tid); + if (!req->r_inode) { + /* do nothing; not an authority problem */ + } else if (req->r_direct_mode != USE_AUTH_MDS) { + dout("not using auth, setting for that now"); + req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; + } else { + struct ceph_inode_info *ci = ceph_inode(req->r_inode); + struct ceph_cap *cap = + ceph_get_cap_for_mds(ci, req->r_mds);; + + dout("already using auth"); + if ((!cap || cap != ci->i_auth_cap) || + (cap->mseq != req->r_sent_on_mseq)) { + dout("but cap changed, so resending"); + __do_request(mdsc, req); + mutex_unlock(&mdsc->mutex); + goto out; + } } - } else { - req->r_num_stale = 0; + dout("have to return ESTALE on request %llu", req->r_tid); } + if (head->safe) { req->r_got_safe = true; __unregister_request(mdsc, req); @@ -1985,7 +2067,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (err == 0) { if (result == 0 && rinfo->dir_nr) ceph_readdir_prepopulate(req, req->r_session); - ceph_unreserve_caps(&req->r_caps_reservation); + ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } mutex_unlock(&req->r_fill_mutex); @@ -2005,7 +2087,7 @@ out_err: } mutex_unlock(&mdsc->mutex); - ceph_add_cap_releases(mdsc, req->r_session, -1); + ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -2193,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { - struct ceph_mds_cap_reconnect rec; + union { + struct ceph_mds_cap_reconnect v2; + struct ceph_mds_cap_reconnect_v1 v1; + } rec; + size_t reclen; struct ceph_inode_info *ci; - struct ceph_pagelist *pagelist = arg; + struct ceph_reconnect_state *recon_state = arg; + struct ceph_pagelist *pagelist = recon_state->pagelist; char *path; int pathlen, err; u64 pathbase; @@ -2228,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&inode->i_lock); cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ - rec.cap_id = cpu_to_le64(cap->cap_id); - rec.pathbase = cpu_to_le64(pathbase); - rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); - rec.issued = cpu_to_le32(cap->issued); - rec.size = cpu_to_le64(inode->i_size); - ceph_encode_timespec(&rec.mtime, &inode->i_mtime); - ceph_encode_timespec(&rec.atime, &inode->i_atime); - rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); + + if (recon_state->flock) { + rec.v2.cap_id = cpu_to_le64(cap->cap_id); + rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); + rec.v2.issued = cpu_to_le32(cap->issued); + rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); + rec.v2.pathbase = cpu_to_le64(pathbase); + rec.v2.flock_len = 0; + reclen = sizeof(rec.v2); + } else { + rec.v1.cap_id = cpu_to_le64(cap->cap_id); + rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); + rec.v1.issued = cpu_to_le32(cap->issued); + rec.v1.size = cpu_to_le64(inode->i_size); + ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); + ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); + rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); + rec.v1.pathbase = cpu_to_le64(pathbase); + reclen = sizeof(rec.v1); + } spin_unlock(&inode->i_lock); - err = ceph_pagelist_append(pagelist, &rec, sizeof(rec)); + if (recon_state->flock) { + int num_fcntl_locks, num_flock_locks; + + lock_kernel(); + ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); + rec.v2.flock_len = (2*sizeof(u32) + + (num_fcntl_locks+num_flock_locks) * + sizeof(struct ceph_filelock)); + + err = ceph_pagelist_append(pagelist, &rec, reclen); + if (!err) + err = ceph_encode_locks(inode, pagelist, + num_fcntl_locks, + num_flock_locks); + unlock_kernel(); + } out: kfree(path); @@ -2267,6 +2381,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds = session->s_mds; int err = -ENOMEM; struct ceph_pagelist *pagelist; + struct ceph_reconnect_state recon_state; pr_info("mds%d reconnect start\n", mds); @@ -2301,7 +2416,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); if (err) goto fail; - err = iterate_session_caps(session, encode_caps_cb, pagelist); + + recon_state.pagelist = pagelist; + recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; + err = iterate_session_caps(session, encode_caps_cb, &recon_state); if (err < 0) goto fail; @@ -2326,6 +2444,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, } reply->pagelist = pagelist; + if (recon_state.flock) + reply->hdr.version = cpu_to_le16(2); reply->hdr.data_len = cpu_to_le32(pagelist->length); reply->nr_pages = calc_pages_for(0, pagelist->length); ceph_con_send(&session->s_con, reply); @@ -2376,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc, oldstate = ceph_mdsmap_get_state(oldmap, i); newstate = ceph_mdsmap_get_state(newmap, i); - dout("check_new_map mds%d state %s -> %s (session %s)\n", + dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", i, ceph_mds_state_name(oldstate), + ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", ceph_mds_state_name(newstate), + ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", session_state_name(s->s_state)); if (memcmp(ceph_mdsmap_get_addr(oldmap, i), @@ -2428,6 +2550,21 @@ static void check_new_map(struct ceph_mds_client *mdsc, wake_up_session_caps(s, 1); } } + + for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { + s = mdsc->sessions[i]; + if (!s) + continue; + if (!ceph_mdsmap_is_laggy(newmap, i)) + continue; + if (s->s_state == CEPH_MDS_SESSION_OPEN || + s->s_state == CEPH_MDS_SESSION_HUNG || + s->s_state == CEPH_MDS_SESSION_CLOSING) { + dout(" connecting to export targets of laggy mds%d\n", + i); + __open_export_target_sessions(mdsc, s); + } + } } @@ -2715,7 +2852,7 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - ceph_add_cap_releases(mdsc, s, -1); + ceph_add_cap_releases(mdsc, s); if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG) ceph_send_cap_releases(mdsc, s); @@ -2764,6 +2901,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) spin_lock_init(&mdsc->dentry_lru_lock); INIT_LIST_HEAD(&mdsc->dentry_lru); + ceph_caps_init(mdsc); + ceph_adjust_min_caps(mdsc, client->min_caps); + return 0; } @@ -2959,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc) if (mdsc->mdsmap) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); + ceph_caps_finalize(mdsc); } |