summaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2020-11-12 12:55:39 +0100
committerIlya Dryomov <idryomov@gmail.com>2020-12-14 23:21:49 +0100
commit566050e17e53db283d4e26b73b4b50556f97ce7b (patch)
tree1f2371c72db8f4a2692487b11586a2409e3ae2d3 /net/ceph
parent6503e0b69c9d4d78b5450db01e79328f8ed4ef21 (diff)
libceph: separate msgr1 protocol implementation
In preparation for msgr2, define internal messenger <-> protocol interface (as opposed to external messenger <-> client interface, which is struct ceph_connection_operations) consisting of try_read(), try_write(), revoke(), revoke_incoming(), opened(), reset_session() and reset_protocol() ops. The semantics are exactly the same as they are now. Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c138
1 files changed, 88 insertions, 50 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 85d20372f923..4ca7d9b594c7 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -593,6 +593,11 @@ int ceph_con_close_socket(struct ceph_connection *con)
return rc;
}
+void ceph_con_v1_reset_protocol(struct ceph_connection *con)
+{
+ con->out_skip = 0;
+}
+
static void ceph_con_reset_protocol(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
@@ -609,7 +614,7 @@ static void ceph_con_reset_protocol(struct ceph_connection *con)
con->out_msg = NULL;
}
- con->out_skip = 0;
+ ceph_con_v1_reset_protocol(con);
}
/*
@@ -631,6 +636,12 @@ static void ceph_msg_remove_list(struct list_head *head)
}
}
+void ceph_con_v1_reset_session(struct ceph_connection *con)
+{
+ con->connect_seq = 0;
+ con->peer_global_seq = 0;
+}
+
void ceph_con_reset_session(struct ceph_connection *con)
{
dout("%s con %p\n", __func__, con);
@@ -643,8 +654,7 @@ void ceph_con_reset_session(struct ceph_connection *con)
con->in_seq = 0;
con->in_seq_acked = 0;
- con->connect_seq = 0;
- con->peer_global_seq = 0;
+ ceph_con_v1_reset_session(con);
}
/*
@@ -692,12 +702,17 @@ void ceph_con_open(struct ceph_connection *con,
}
EXPORT_SYMBOL(ceph_con_open);
+bool ceph_con_v1_opened(struct ceph_connection *con)
+{
+ return con->connect_seq;
+}
+
/*
* return true if this connection ever successfully opened
*/
bool ceph_con_opened(struct ceph_connection *con)
{
- return con->connect_seq > 0;
+ return ceph_con_v1_opened(con);
}
/*
@@ -2552,7 +2567,7 @@ static int read_keepalive_ack(struct ceph_connection *con)
* Write something to the socket. Called in a worker thread when the
* socket appears to be writeable and we have something ready to send.
*/
-static int try_write(struct ceph_connection *con)
+int ceph_con_v1_try_write(struct ceph_connection *con)
{
int ret = 1;
@@ -2649,7 +2664,7 @@ out:
/*
* Read what we can from the socket.
*/
-static int try_read(struct ceph_connection *con)
+int ceph_con_v1_try_read(struct ceph_connection *con)
{
int ret = -1;
@@ -2930,7 +2945,7 @@ static void ceph_con_workfn(struct work_struct *work)
BUG_ON(con->sock);
}
- ret = try_read(con);
+ ret = ceph_con_v1_try_read(con);
if (ret < 0) {
if (ret == -EAGAIN)
continue;
@@ -2940,7 +2955,7 @@ static void ceph_con_workfn(struct work_struct *work)
break;
}
- ret = try_write(con);
+ ret = ceph_con_v1_try_write(con);
if (ret < 0) {
if (ret == -EAGAIN)
continue;
@@ -3116,6 +3131,29 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
}
EXPORT_SYMBOL(ceph_con_send);
+void ceph_con_v1_revoke(struct ceph_connection *con)
+{
+ struct ceph_msg *msg = con->out_msg;
+
+ WARN_ON(con->out_skip);
+ /* footer */
+ if (con->out_msg_done) {
+ con->out_skip += con_out_kvec_skip(con);
+ } else {
+ WARN_ON(!msg->data_length);
+ con->out_skip += sizeof_footer(con);
+ }
+ /* data, middle, front */
+ if (msg->data_length)
+ con->out_skip += msg->cursor.total_resid;
+ if (msg->middle)
+ con->out_skip += con_out_kvec_skip(con);
+ con->out_skip += con_out_kvec_skip(con);
+
+ dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
+ con->out_kvec_bytes, con->out_skip);
+}
+
/*
* Revoke a message that was previously queued for send
*/
@@ -3129,39 +3167,50 @@ void ceph_msg_revoke(struct ceph_msg *msg)
}
mutex_lock(&con->mutex);
- if (!list_empty(&msg->list_head)) {
- dout("%s %p msg %p - was on queue\n", __func__, con, msg);
- list_del_init(&msg->list_head);
- msg->hdr.seq = 0;
-
- ceph_msg_put(msg);
+ if (list_empty(&msg->list_head)) {
+ WARN_ON(con->out_msg == msg);
+ dout("%s con %p msg %p not linked\n", __func__, con, msg);
+ mutex_unlock(&con->mutex);
+ return;
}
- if (con->out_msg == msg) {
- BUG_ON(con->out_skip);
- /* footer */
- if (con->out_msg_done) {
- con->out_skip += con_out_kvec_skip(con);
- } else {
- BUG_ON(!msg->data_length);
- con->out_skip += sizeof_footer(con);
- }
- /* data, middle, front */
- if (msg->data_length)
- con->out_skip += msg->cursor.total_resid;
- if (msg->middle)
- con->out_skip += con_out_kvec_skip(con);
- con->out_skip += con_out_kvec_skip(con);
- dout("%s %p msg %p - was sending, will write %d skip %d\n",
- __func__, con, msg, con->out_kvec_bytes, con->out_skip);
- msg->hdr.seq = 0;
+ dout("%s con %p msg %p was linked\n", __func__, con, msg);
+ msg->hdr.seq = 0;
+ ceph_msg_remove(msg);
+
+ if (con->out_msg == msg) {
+ WARN_ON(con->state != CEPH_CON_S_OPEN);
+ dout("%s con %p msg %p was sending\n", __func__, con, msg);
+ ceph_con_v1_revoke(con);
+ ceph_msg_put(con->out_msg);
con->out_msg = NULL;
- ceph_msg_put(msg);
+ } else {
+ dout("%s con %p msg %p not current, out_msg %p\n", __func__,
+ con, msg, con->out_msg);
}
-
mutex_unlock(&con->mutex);
}
+void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
+{
+ unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+ unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+ unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
+
+ /* skip rest of message */
+ con->in_base_pos = con->in_base_pos -
+ sizeof(struct ceph_msg_header) -
+ front_len -
+ middle_len -
+ data_len -
+ sizeof(struct ceph_msg_footer);
+
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ con->in_seq++;
+
+ dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos);
+}
+
/*
* Revoke a message that we may be reading data into
*/
@@ -3176,25 +3225,14 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg)
mutex_lock(&con->mutex);
if (con->in_msg == msg) {
- unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
- unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
- unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
-
- /* skip rest of message */
- dout("%s %p msg %p revoked\n", __func__, con, msg);
- con->in_base_pos = con->in_base_pos -
- sizeof(struct ceph_msg_header) -
- front_len -
- middle_len -
- data_len -
- sizeof(struct ceph_msg_footer);
+ WARN_ON(con->state != CEPH_CON_S_OPEN);
+ dout("%s con %p msg %p was recving\n", __func__, con, msg);
+ ceph_con_v1_revoke_incoming(con);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
- con->in_tag = CEPH_MSGR_TAG_READY;
- con->in_seq++;
} else {
- dout("%s %p in_msg %p msg %p no-op\n",
- __func__, con, con->in_msg, msg);
+ dout("%s con %p msg %p not current, in_msg %p\n", __func__,
+ con, msg, con->in_msg);
}
mutex_unlock(&con->mutex);
}