Lines Matching refs:monc
37 static int __validate_auth(struct ceph_mon_client *monc);
110 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
112 monc->pending_auth = 1;
113 monc->m_auth->front.iov_len = len;
114 monc->m_auth->hdr.front_len = cpu_to_le32(len);
115 ceph_msg_revoke(monc->m_auth);
116 ceph_msg_get(monc->m_auth); /* keep our ref */
117 ceph_con_send(&monc->con, monc->m_auth);
123 static void __close_session(struct ceph_mon_client *monc)
125 dout("__close_session closing mon%d\n", monc->cur_mon);
126 ceph_msg_revoke(monc->m_auth);
127 ceph_msg_revoke_incoming(monc->m_auth_reply);
128 ceph_msg_revoke(monc->m_subscribe);
129 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
130 ceph_con_close(&monc->con);
132 monc->pending_auth = 0;
133 ceph_auth_reset(monc->auth);
140 static void pick_new_mon(struct ceph_mon_client *monc)
142 int old_mon = monc->cur_mon;
144 BUG_ON(monc->monmap->num_mon < 1);
146 if (monc->monmap->num_mon == 1) {
147 monc->cur_mon = 0;
149 int max = monc->monmap->num_mon;
153 if (monc->cur_mon >= 0) {
154 if (monc->cur_mon < monc->monmap->num_mon)
155 o = monc->cur_mon;
164 monc->cur_mon = n;
168 monc->cur_mon, monc->monmap->num_mon);
174 static void __open_session(struct ceph_mon_client *monc)
178 pick_new_mon(monc);
180 monc->hunting = true;
181 if (monc->had_a_connection) {
182 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
183 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
184 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
187 monc->sub_renew_after = jiffies; /* i.e., expired */
188 monc->sub_renew_sent = 0;
190 dout("%s opening mon%d\n", __func__, monc->cur_mon);
191 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
192 &monc->monmap->mon_inst[monc->cur_mon].addr);
198 ceph_con_keepalive(&monc->con);
201 ret = ceph_auth_build_hello(monc->auth,
202 monc->m_auth->front.iov_base,
203 monc->m_auth->front_alloc_len);
205 __send_prepared_auth_request(monc, ret);
208 static void reopen_session(struct ceph_mon_client *monc)
210 if (!monc->hunting)
212 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
214 __close_session(monc);
215 __open_session(monc);
218 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
220 mutex_lock(&monc->mutex);
221 reopen_session(monc);
222 mutex_unlock(&monc->mutex);
225 static void un_backoff(struct ceph_mon_client *monc)
227 monc->hunt_mult /= 2; /* reduce by 50% */
228 if (monc->hunt_mult < 1)
229 monc->hunt_mult = 1;
230 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
236 static void __schedule_delayed(struct ceph_mon_client *monc)
240 if (monc->hunting)
241 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
246 mod_delayed_work(system_wq, &monc->delayed_work,
259 * monc->subs.
261 static void __send_subscribe(struct ceph_mon_client *monc)
263 struct ceph_msg *msg = monc->m_subscribe;
269 dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
271 BUG_ON(monc->cur_mon < 0);
273 if (!monc->sub_renew_sent)
274 monc->sub_renew_sent = jiffies | 1; /* never 0 */
278 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
279 if (monc->subs[i].want)
284 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
288 if (!monc->subs[i].want)
293 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
294 len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
297 le64_to_cpu(monc->subs[i].item.start),
298 monc->subs[i].item.flags);
300 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
301 p += sizeof(monc->subs[i].item);
308 ceph_con_send(&monc->con, ceph_msg_get(msg));
311 static void handle_subscribe_ack(struct ceph_mon_client *monc,
321 mutex_lock(&monc->mutex);
322 if (monc->sub_renew_sent) {
327 monc->sub_renew_after = monc->sub_renew_sent +
330 monc->sub_renew_sent, seconds, monc->sub_renew_after);
331 monc->sub_renew_sent = 0;
334 monc->sub_renew_sent, monc->sub_renew_after);
336 mutex_unlock(&monc->mutex);
349 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
358 if (monc->subs[sub].want &&
359 monc->subs[sub].item.start == start &&
360 monc->subs[sub].item.flags == flags)
363 monc->subs[sub].item.start = start;
364 monc->subs[sub].item.flags = flags;
365 monc->subs[sub].want = true;
370 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
375 mutex_lock(&monc->mutex);
376 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
377 mutex_unlock(&monc->mutex);
388 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
393 if (monc->subs[sub].want) {
394 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
395 monc->subs[sub].want = false;
397 monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
400 monc->subs[sub].have = epoch;
403 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
405 mutex_lock(&monc->mutex);
406 __ceph_monc_got_map(monc, sub, epoch);
407 mutex_unlock(&monc->mutex);
411 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
413 mutex_lock(&monc->mutex);
414 __send_subscribe(monc);
415 mutex_unlock(&monc->mutex);
425 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
431 mutex_lock(&monc->mutex);
432 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
433 mutex_unlock(&monc->mutex);
438 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
439 monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
444 mutex_lock(&monc->mutex);
447 mutex_unlock(&monc->mutex);
456 int ceph_monc_open_session(struct ceph_mon_client *monc)
458 mutex_lock(&monc->mutex);
459 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
460 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
461 __open_session(monc);
462 __schedule_delayed(monc);
463 mutex_unlock(&monc->mutex);
468 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
471 struct ceph_client *client = monc->client;
475 mutex_lock(&monc->mutex);
494 kfree(monc->monmap);
495 monc->monmap = monmap;
497 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
501 mutex_unlock(&monc->mutex);
539 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
547 req->monc = monc;
558 struct ceph_mon_client *monc = req->monc;
563 req->tid = ++monc->last_tid;
564 insert_generic_request(&monc->generic_request_tree, req);
567 static void send_generic_request(struct ceph_mon_client *monc,
574 ceph_con_send(&monc->con, ceph_msg_get(req->request));
579 struct ceph_mon_client *monc = req->monc;
582 erase_generic_request(&monc->generic_request_tree, req);
605 struct ceph_mon_client *monc = req->monc;
610 mutex_lock(&monc->mutex);
611 lookup_req = lookup_generic_request(&monc->generic_request_tree,
618 mutex_unlock(&monc->mutex);
639 struct ceph_mon_client *monc = con->private;
644 mutex_lock(&monc->mutex);
645 req = lookup_generic_request(&monc->generic_request_tree, tid);
660 mutex_unlock(&monc->mutex);
667 static void handle_statfs_reply(struct ceph_mon_client *monc,
679 mutex_lock(&monc->mutex);
680 req = lookup_generic_request(&monc->generic_request_tree, tid);
682 mutex_unlock(&monc->mutex);
689 mutex_unlock(&monc->mutex);
702 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
709 req = alloc_generic_request(monc, GFP_NOFS);
725 mutex_lock(&monc->mutex);
732 h->fsid = monc->monmap->fsid;
735 send_generic_request(monc, req);
736 mutex_unlock(&monc->mutex);
745 static void handle_get_version_reply(struct ceph_mon_client *monc,
761 mutex_lock(&monc->mutex);
762 req = lookup_generic_request(&monc->generic_request_tree, handle);
764 mutex_unlock(&monc->mutex);
771 mutex_unlock(&monc->mutex);
782 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
787 req = alloc_generic_request(monc, GFP_NOIO);
805 mutex_lock(&monc->mutex);
815 send_generic_request(monc, req);
816 mutex_unlock(&monc->mutex);
830 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
836 req = __ceph_monc_get_version(monc, what, NULL, 0);
854 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
859 req = __ceph_monc_get_version(monc, what, cb, private_data);
868 static void handle_command_ack(struct ceph_mon_client *monc,
882 mutex_lock(&monc->mutex);
883 req = lookup_generic_request(&monc->generic_request_tree, tid);
885 mutex_unlock(&monc->mutex);
891 mutex_unlock(&monc->mutex);
902 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
910 req = alloc_generic_request(monc, GFP_NOIO);
923 mutex_lock(&monc->mutex);
929 h->fsid = monc->monmap->fsid;
933 send_generic_request(monc, req);
934 mutex_unlock(&monc->mutex);
943 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
949 ret = do_mon_command_vargs(monc, fmt, ap);
954 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
959 ret = do_mon_command(monc,
971 ret = do_mon_command(monc,
987 return ceph_wait_for_latest_osdmap(monc->client, 0);
994 static void __resend_generic_request(struct ceph_mon_client *monc)
999 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1003 ceph_con_send(&monc->con, ceph_msg_get(req->request));
1014 struct ceph_mon_client *monc =
1017 mutex_lock(&monc->mutex);
1018 dout("%s mon%d\n", __func__, monc->cur_mon);
1019 if (monc->cur_mon < 0) {
1023 if (monc->hunting) {
1025 reopen_session(monc);
1027 int is_auth = ceph_auth_is_authenticated(monc->auth);
1030 if (ceph_con_keepalive_expired(&monc->con,
1032 dout("monc keepalive timeout\n");
1034 reopen_session(monc);
1037 if (!monc->hunting) {
1038 ceph_con_keepalive(&monc->con);
1039 __validate_auth(monc);
1040 un_backoff(monc);
1044 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1048 __func__, now, monc->sub_renew_after);
1049 if (time_after_eq(now, monc->sub_renew_after))
1050 __send_subscribe(monc);
1053 __schedule_delayed(monc);
1056 mutex_unlock(&monc->mutex);
1063 static int build_initial_monmap(struct ceph_mon_client *monc)
1065 struct ceph_options *opt = monc->client->options;
1071 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1073 if (!monc->monmap)
1076 monc->monmap->mon_inst[i].addr = mon_addr[i];
1077 monc->monmap->mon_inst[i].addr.nonce = 0;
1078 monc->monmap->mon_inst[i].name.type =
1080 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1082 monc->monmap->num_mon = num_mon;
1086 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1091 memset(monc, 0, sizeof(*monc));
1092 monc->client = cl;
1093 monc->monmap = NULL;
1094 mutex_init(&monc->mutex);
1096 err = build_initial_monmap(monc);
1102 monc->auth = ceph_auth_init(cl->options->name,
1104 if (IS_ERR(monc->auth)) {
1105 err = PTR_ERR(monc->auth);
1108 monc->auth->want_keys =
1114 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1117 if (!monc->m_subscribe_ack)
1120 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1122 if (!monc->m_subscribe)
1125 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1127 if (!monc->m_auth_reply)
1130 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1131 monc->pending_auth = 0;
1132 if (!monc->m_auth)
1135 ceph_con_init(&monc->con, monc, &mon_con_ops,
1136 &monc->client->msgr);
1138 monc->cur_mon = -1;
1139 monc->had_a_connection = false;
1140 monc->hunt_mult = 1;
1142 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1143 monc->generic_request_tree = RB_ROOT;
1144 monc->last_tid = 0;
1146 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1151 ceph_msg_put(monc->m_auth_reply);
1153 ceph_msg_put(monc->m_subscribe);
1155 ceph_msg_put(monc->m_subscribe_ack);
1157 ceph_auth_destroy(monc->auth);
1159 kfree(monc->monmap);
1165 void ceph_monc_stop(struct ceph_mon_client *monc)
1169 mutex_lock(&monc->mutex);
1170 __close_session(monc);
1171 monc->hunting = false;
1172 monc->cur_mon = -1;
1173 mutex_unlock(&monc->mutex);
1175 cancel_delayed_work_sync(&monc->delayed_work);
1185 ceph_auth_destroy(monc->auth);
1187 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1189 ceph_msg_put(monc->m_auth);
1190 ceph_msg_put(monc->m_auth_reply);
1191 ceph_msg_put(monc->m_subscribe);
1192 ceph_msg_put(monc->m_subscribe_ack);
1194 kfree(monc->monmap);
1198 static void finish_hunting(struct ceph_mon_client *monc)
1200 if (monc->hunting) {
1201 dout("%s found mon%d\n", __func__, monc->cur_mon);
1202 monc->hunting = false;
1203 monc->had_a_connection = true;
1204 un_backoff(monc);
1205 __schedule_delayed(monc);
1209 static void handle_auth_reply(struct ceph_mon_client *monc,
1215 mutex_lock(&monc->mutex);
1216 was_auth = ceph_auth_is_authenticated(monc->auth);
1217 monc->pending_auth = 0;
1218 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1220 monc->m_auth->front.iov_base,
1221 monc->m_auth->front_alloc_len);
1223 __send_prepared_auth_request(monc, ret);
1227 finish_hunting(monc);
1230 monc->client->auth_err = ret;
1231 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1234 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1235 monc->client->msgr.inst.name.num =
1236 cpu_to_le64(monc->auth->global_id);
1238 __send_subscribe(monc);
1239 __resend_generic_request(monc);
1241 pr_info("mon%d %s session established\n", monc->cur_mon,
1242 ceph_pr_addr(&monc->con.peer_addr));
1246 mutex_unlock(&monc->mutex);
1247 if (monc->client->auth_err < 0)
1248 wake_up_all(&monc->client->auth_wq);
1251 static int __validate_auth(struct ceph_mon_client *monc)
1255 if (monc->pending_auth)
1258 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1259 monc->m_auth->front_alloc_len);
1262 __send_prepared_auth_request(monc, ret);
1266 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1270 mutex_lock(&monc->mutex);
1271 ret = __validate_auth(monc);
1272 mutex_unlock(&monc->mutex);
1282 struct ceph_mon_client *monc = con->private;
1287 handle_auth_reply(monc, msg);
1291 handle_subscribe_ack(monc, msg);
1295 handle_statfs_reply(monc, msg);
1299 handle_get_version_reply(monc, msg);
1303 handle_command_ack(monc, msg);
1307 ceph_monc_handle_map(monc, msg);
1311 ceph_osdc_handle_map(&monc->client->osdc, msg);
1316 if (monc->client->extra_mon_dispatch &&
1317 monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1333 struct ceph_mon_client *monc = con->private;
1342 m = ceph_msg_get(monc->m_subscribe_ack);
1348 m = ceph_msg_get(monc->m_auth_reply);
1391 struct ceph_mon_client *monc = con->private;
1393 mutex_lock(&monc->mutex);
1394 dout("%s mon%d\n", __func__, monc->cur_mon);
1395 if (monc->cur_mon >= 0) {
1396 if (!monc->hunting) {
1398 reopen_session(monc);
1399 __schedule_delayed(monc);
1404 mutex_unlock(&monc->mutex);