diff options
Diffstat (limited to 'net/ceph/mon_client.c')
-rw-r--r-- | net/ceph/mon_client.c | 457 |
1 files changed, 252 insertions, 205 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index de85dddc3dc0..cf638c009cfa 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -122,51 +122,91 @@ static void __close_session(struct ceph_mon_client *monc) ceph_msg_revoke(monc->m_subscribe); ceph_msg_revoke_incoming(monc->m_subscribe_ack); ceph_con_close(&monc->con); - monc->cur_mon = -1; + monc->pending_auth = 0; ceph_auth_reset(monc->auth); } /* - * Open a session with a (new) monitor. + * Pick a new monitor at random and set cur_mon. If we are repicking + * (i.e. cur_mon is already set), be sure to pick a different one. */ -static int __open_session(struct ceph_mon_client *monc) +static void pick_new_mon(struct ceph_mon_client *monc) { - char r; - int ret; + int old_mon = monc->cur_mon; - if (monc->cur_mon < 0) { - get_random_bytes(&r, 1); - monc->cur_mon = r % monc->monmap->num_mon; - dout("open_session num=%d r=%d -> mon%d\n", - monc->monmap->num_mon, r, monc->cur_mon); - monc->sub_sent = 0; - monc->sub_renew_after = jiffies; /* i.e., expired */ - monc->want_next_osdmap = !!monc->want_next_osdmap; - - dout("open_session mon%d opening\n", monc->cur_mon); - ceph_con_open(&monc->con, - CEPH_ENTITY_TYPE_MON, monc->cur_mon, - &monc->monmap->mon_inst[monc->cur_mon].addr); - - /* send an initial keepalive to ensure our timestamp is - * valid by the time we are in an OPENED state */ - ceph_con_keepalive(&monc->con); - - /* initiatiate authentication handshake */ - ret = ceph_auth_build_hello(monc->auth, - monc->m_auth->front.iov_base, - monc->m_auth->front_alloc_len); - __send_prepared_auth_request(monc, ret); + BUG_ON(monc->monmap->num_mon < 1); + + if (monc->monmap->num_mon == 1) { + monc->cur_mon = 0; } else { - dout("open_session mon%d already open\n", monc->cur_mon); + int max = monc->monmap->num_mon; + int o = -1; + int n; + + if (monc->cur_mon >= 0) { + if (monc->cur_mon < monc->monmap->num_mon) + o = monc->cur_mon; + if (o >= 0) + max--; + } + + n = prandom_u32() % max; + if (o >= 0 && n >= o) + n++; + + monc->cur_mon = n; } - return 0; + + dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, + monc->cur_mon, monc->monmap->num_mon); +} + +/* + * Open a session with a new monitor. + */ +static void __open_session(struct ceph_mon_client *monc) +{ + int ret; + + pick_new_mon(monc); + + monc->hunting = true; + if (monc->had_a_connection) { + monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; + if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) + monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; + } + + monc->sub_renew_after = jiffies; /* i.e., expired */ + monc->sub_renew_sent = 0; + + dout("%s opening mon%d\n", __func__, monc->cur_mon); + ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, + &monc->monmap->mon_inst[monc->cur_mon].addr); + + /* + * send an initial keepalive to ensure our timestamp is valid + * by the time we are in an OPENED state + */ + ceph_con_keepalive(&monc->con); + + /* initiate authentication handshake */ + ret = ceph_auth_build_hello(monc->auth, + monc->m_auth->front.iov_base, + monc->m_auth->front_alloc_len); + BUG_ON(ret <= 0); + __send_prepared_auth_request(monc, ret); } -static bool __sub_expired(struct ceph_mon_client *monc) +static void reopen_session(struct ceph_mon_client *monc) { - return time_after_eq(jiffies, monc->sub_renew_after); + if (!monc->hunting) + pr_info("mon%d %s session lost, hunting for new mon\n", + monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr)); + + __close_session(monc); + __open_session(monc); } /* @@ -174,74 +214,70 @@ static bool __sub_expired(struct ceph_mon_client *monc) */ static void __schedule_delayed(struct ceph_mon_client *monc) { - struct ceph_options *opt = monc->client->options; unsigned long delay; - if (monc->cur_mon < 0 || __sub_expired(monc)) { - delay = 10 * HZ; - } else { - delay = 20 * HZ; - if (opt->monc_ping_timeout > 0) - delay = min(delay, opt->monc_ping_timeout / 3); - } + if (monc->hunting) + delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; + else + delay = CEPH_MONC_PING_INTERVAL; + dout("__schedule_delayed after %lu\n", delay); - schedule_delayed_work(&monc->delayed_work, - round_jiffies_relative(delay)); + mod_delayed_work(system_wq, &monc->delayed_work, + round_jiffies_relative(delay)); } +const char *ceph_sub_str[] = { + [CEPH_SUB_MDSMAP] = "mdsmap", + [CEPH_SUB_MONMAP] = "monmap", + [CEPH_SUB_OSDMAP] = "osdmap", +}; + /* - * Send subscribe request for mdsmap and/or osdmap. + * Send subscribe request for one or more maps, according to + * monc->subs. */ static void __send_subscribe(struct ceph_mon_client *monc) { - dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", - (unsigned int)monc->sub_sent, __sub_expired(monc), - monc->want_next_osdmap); - if ((__sub_expired(monc) && !monc->sub_sent) || - monc->want_next_osdmap == 1) { - struct ceph_msg *msg = monc->m_subscribe; - struct ceph_mon_subscribe_item *i; - void *p, *end; - int num; - - p = msg->front.iov_base; - end = p + msg->front_alloc_len; - - num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; - ceph_encode_32(&p, num); - - if (monc->want_next_osdmap) { - dout("__send_subscribe to 'osdmap' %u\n", - (unsigned int)monc->have_osdmap); - ceph_encode_string(&p, end, "osdmap", 6); - i = p; - i->have = cpu_to_le64(monc->have_osdmap); - i->onetime = 1; - p += sizeof(*i); - monc->want_next_osdmap = 2; /* requested */ - } - if (monc->want_mdsmap) { - dout("__send_subscribe to 'mdsmap' %u+\n", - (unsigned int)monc->have_mdsmap); - ceph_encode_string(&p, end, "mdsmap", 6); - i = p; - i->have = cpu_to_le64(monc->have_mdsmap); - i->onetime = 0; - p += sizeof(*i); - } - ceph_encode_string(&p, end, "monmap", 6); - i = p; - i->have = 0; - i->onetime = 0; - p += sizeof(*i); - - msg->front.iov_len = p - msg->front.iov_base; - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - ceph_msg_revoke(msg); - ceph_con_send(&monc->con, ceph_msg_get(msg)); - - monc->sub_sent = jiffies | 1; /* never 0 */ + struct ceph_msg *msg = monc->m_subscribe; + void *p = msg->front.iov_base; + void *const end = p + msg->front_alloc_len; + int num = 0; + int i; + + dout("%s sent %lu\n", __func__, monc->sub_renew_sent); + + BUG_ON(monc->cur_mon < 0); + + if (!monc->sub_renew_sent) + monc->sub_renew_sent = jiffies | 1; /* never 0 */ + + msg->hdr.version = cpu_to_le16(2); + + for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { + if (monc->subs[i].want) + num++; } + BUG_ON(num < 1); /* monmap sub is always there */ + ceph_encode_32(&p, num); + for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { + const char *s = ceph_sub_str[i]; + + if (!monc->subs[i].want) + continue; + + dout("%s %s start %llu flags 0x%x\n", __func__, s, + le64_to_cpu(monc->subs[i].item.start), + monc->subs[i].item.flags); + ceph_encode_string(&p, end, s, strlen(s)); + memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); + p += sizeof(monc->subs[i].item); + } + + BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19)); + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + ceph_msg_revoke(msg); + ceph_con_send(&monc->con, ceph_msg_get(msg)); } static void handle_subscribe_ack(struct ceph_mon_client *monc, @@ -255,15 +291,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, seconds = le32_to_cpu(h->duration); mutex_lock(&monc->mutex); - if (monc->hunting) { - pr_info("mon%d %s session established\n", - monc->cur_mon, - ceph_pr_addr(&monc->con.peer_addr.in_addr)); - monc->hunting = false; + if (monc->sub_renew_sent) { + monc->sub_renew_after = monc->sub_renew_sent + + (seconds >> 1) * HZ - 1; + dout("%s sent %lu duration %d renew after %lu\n", __func__, + monc->sub_renew_sent, seconds, monc->sub_renew_after); + monc->sub_renew_sent = 0; + } else { + dout("%s sent %lu renew after %lu, ignoring\n", __func__, + monc->sub_renew_sent, monc->sub_renew_after); } - dout("handle_subscribe_ack after %d seconds\n", seconds); - monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; - monc->sub_sent = 0; mutex_unlock(&monc->mutex); return; bad: @@ -272,36 +309,82 @@ bad: } /* - * Keep track of which maps we have + * Register interest in a map + * + * @sub: one of CEPH_SUB_* + * @epoch: X for "every map since X", or 0 for "just the latest" */ -int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) +static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, + u32 epoch, bool continuous) +{ + __le64 start = cpu_to_le64(epoch); + u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; + + dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], + epoch, continuous); + + if (monc->subs[sub].want && + monc->subs[sub].item.start == start && + monc->subs[sub].item.flags == flags) + return false; + + monc->subs[sub].item.start = start; + monc->subs[sub].item.flags = flags; + monc->subs[sub].want = true; + + return true; +} + +bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, + bool continuous) { + bool need_request; + mutex_lock(&monc->mutex); - monc->have_mdsmap = got; + need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); mutex_unlock(&monc->mutex); - return 0; + + return need_request; } -EXPORT_SYMBOL(ceph_monc_got_mdsmap); +EXPORT_SYMBOL(ceph_monc_want_map); -int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) +/* + * Keep track of which maps we have + * + * @sub: one of CEPH_SUB_* + */ +static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, + u32 epoch) +{ + dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); + + if (monc->subs[sub].want) { + if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) + monc->subs[sub].want = false; + else + monc->subs[sub].item.start = cpu_to_le64(epoch + 1); + } + + monc->subs[sub].have = epoch; +} + +void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) { mutex_lock(&monc->mutex); - monc->have_osdmap = got; - monc->want_next_osdmap = 0; + __ceph_monc_got_map(monc, sub, epoch); mutex_unlock(&monc->mutex); - return 0; } +EXPORT_SYMBOL(ceph_monc_got_map); /* * Register interest in the next osdmap */ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) { - dout("request_next_osdmap have %u\n", monc->have_osdmap); + dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have); mutex_lock(&monc->mutex); - if (!monc->want_next_osdmap) - monc->want_next_osdmap = 1; - if (monc->want_next_osdmap < 2) + if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, + monc->subs[CEPH_SUB_OSDMAP].have + 1, false)) __send_subscribe(monc); mutex_unlock(&monc->mutex); } @@ -320,15 +403,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, long ret; mutex_lock(&monc->mutex); - while (monc->have_osdmap < epoch) { + while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { mutex_unlock(&monc->mutex); if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT; ret = wait_event_interruptible_timeout(monc->client->auth_wq, - monc->have_osdmap >= epoch, - ceph_timeout_jiffies(timeout)); + monc->subs[CEPH_SUB_OSDMAP].have >= epoch, + ceph_timeout_jiffies(timeout)); if (ret < 0) return ret; @@ -341,11 +424,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, EXPORT_SYMBOL(ceph_monc_wait_osdmap); /* - * + * Open a session with a random monitor. Request monmap and osdmap, + * which are waited upon in __ceph_open_session(). */ int ceph_monc_open_session(struct ceph_mon_client *monc) { mutex_lock(&monc->mutex); + __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); + __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); __open_session(monc); __schedule_delayed(monc); mutex_unlock(&monc->mutex); @@ -353,29 +439,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_open_session); -/* - * We require the fsid and global_id in order to initialize our - * debugfs dir. - */ -static bool have_debugfs_info(struct ceph_mon_client *monc) -{ - dout("have_debugfs_info fsid %d globalid %lld\n", - (int)monc->client->have_fsid, monc->auth->global_id); - return monc->client->have_fsid && monc->auth->global_id > 0; -} - static void ceph_monc_handle_map(struct ceph_mon_client *monc, struct ceph_msg *msg) { struct ceph_client *client = monc->client; struct ceph_monmap *monmap = NULL, *old = monc->monmap; void *p, *end; - int had_debugfs_info, init_debugfs = 0; mutex_lock(&monc->mutex); - had_debugfs_info = have_debugfs_info(monc); - dout("handle_monmap\n"); p = msg->front.iov_base; end = p + msg->front.iov_len; @@ -395,29 +467,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, client->monc.monmap = monmap; kfree(old); - if (!client->have_fsid) { - client->have_fsid = true; - if (!had_debugfs_info && have_debugfs_info(monc)) { - pr_info("client%lld fsid %pU\n", - ceph_client_id(monc->client), - &monc->client->fsid); - init_debugfs = 1; - } - mutex_unlock(&monc->mutex); - - if (init_debugfs) { - /* - * do debugfs initialization without mutex to avoid - * creating a locking dependency - */ - ceph_debugfs_client_init(monc->client); - } + __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); + client->have_fsid = true; - goto out_unlocked; - } out: mutex_unlock(&monc->mutex); -out_unlocked: wake_up_all(&client->auth_wq); } @@ -745,18 +799,15 @@ static void delayed_work(struct work_struct *work) dout("monc delayed_work\n"); mutex_lock(&monc->mutex); if (monc->hunting) { - __close_session(monc); - __open_session(monc); /* continue hunting */ + dout("%s continuing hunt\n", __func__); + reopen_session(monc); } else { - struct ceph_options *opt = monc->client->options; int is_auth = ceph_auth_is_authenticated(monc->auth); if (ceph_con_keepalive_expired(&monc->con, - opt->monc_ping_timeout)) { + CEPH_MONC_PING_TIMEOUT)) { dout("monc keepalive timeout\n"); is_auth = 0; - __close_session(monc); - monc->hunting = true; - __open_session(monc); + reopen_session(monc); } if (!monc->hunting) { @@ -764,8 +815,14 @@ static void delayed_work(struct work_struct *work) __validate_auth(monc); } - if (is_auth) - __send_subscribe(monc); + if (is_auth) { + unsigned long now = jiffies; + + dout("%s renew subs? now %lu renew after %lu\n", + __func__, now, monc->sub_renew_after); + if (time_after_eq(now, monc->sub_renew_after)) + __send_subscribe(monc); + } } __schedule_delayed(monc); mutex_unlock(&monc->mutex); @@ -852,18 +909,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) &monc->client->msgr); monc->cur_mon = -1; - monc->hunting = true; - monc->sub_renew_after = jiffies; - monc->sub_sent = 0; + monc->had_a_connection = false; + monc->hunt_mult = 1; INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); monc->generic_request_tree = RB_ROOT; monc->num_generic_requests = 0; monc->last_tid = 0; - monc->have_mdsmap = 0; - monc->have_osdmap = 0; - monc->want_next_osdmap = 1; return 0; out_auth_reply: @@ -888,7 +941,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_lock(&monc->mutex); __close_session(monc); - + monc->cur_mon = -1; mutex_unlock(&monc->mutex); /* @@ -910,26 +963,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_stop); +static void finish_hunting(struct ceph_mon_client *monc) +{ + if (monc->hunting) { + dout("%s found mon%d\n", __func__, monc->cur_mon); + monc->hunting = false; + monc->had_a_connection = true; + monc->hunt_mult /= 2; /* reduce by 50% */ + if (monc->hunt_mult < 1) + monc->hunt_mult = 1; + } +} + static void handle_auth_reply(struct ceph_mon_client *monc, struct ceph_msg *msg) { int ret; int was_auth = 0; - int had_debugfs_info, init_debugfs = 0; mutex_lock(&monc->mutex); - had_debugfs_info = have_debugfs_info(monc); was_auth = ceph_auth_is_authenticated(monc->auth); monc->pending_auth = 0; ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, monc->m_auth->front.iov_base, monc->m_auth->front_alloc_len); + if (ret > 0) { + __send_prepared_auth_request(monc, ret); + goto out; + } + + finish_hunting(monc); + if (ret < 0) { monc->client->auth_err = ret; - wake_up_all(&monc->client->auth_wq); - } else if (ret > 0) { - __send_prepared_auth_request(monc, ret); } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { dout("authenticated, starting session\n"); @@ -939,23 +1006,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc, __send_subscribe(monc); __resend_generic_request(monc); - } - if (!had_debugfs_info && have_debugfs_info(monc)) { - pr_info("client%lld fsid %pU\n", - ceph_client_id(monc->client), - &monc->client->fsid); - init_debugfs = 1; + pr_info("mon%d %s session established\n", monc->cur_mon, + ceph_pr_addr(&monc->con.peer_addr.in_addr)); } - mutex_unlock(&monc->mutex); - if (init_debugfs) { - /* - * do debugfs initialization without mutex to avoid - * creating a locking dependency - */ - ceph_debugfs_client_init(monc->client); - } +out: + mutex_unlock(&monc->mutex); + if (monc->client->auth_err < 0) + wake_up_all(&monc->client->auth_wq); } static int __validate_auth(struct ceph_mon_client *monc) @@ -1096,29 +1155,17 @@ static void mon_fault(struct ceph_connection *con) { struct ceph_mon_client *monc = con->private; - if (!monc) - return; - - dout("mon_fault\n"); mutex_lock(&monc->mutex); - if (!con->private) - goto out; - - if (!monc->hunting) - pr_info("mon%d %s session lost, " - "hunting for new mon\n", monc->cur_mon, - ceph_pr_addr(&monc->con.peer_addr.in_addr)); - - __close_session(monc); - if (!monc->hunting) { - /* start hunting */ - monc->hunting = true; - __open_session(monc); - } else { - /* already hunting, let's wait a bit */ - __schedule_delayed(monc); + dout("%s mon%d\n", __func__, monc->cur_mon); + if (monc->cur_mon >= 0) { + if (!monc->hunting) { + dout("%s hunting for new mon\n", __func__); + reopen_session(monc); + __schedule_delayed(monc); + } else { + dout("%s already hunting\n", __func__); + } } -out: mutex_unlock(&monc->mutex); } |