Lines Matching refs:osd
41 * are described by the osd map.
49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51 static void link_linger(struct ceph_osd *osd,
53 static void unlink_linger(struct ceph_osd *osd,
55 static void clear_backoffs(struct ceph_osd *osd);
77 static inline void verify_osd_locked(struct ceph_osd *osd)
79 struct ceph_osd_client *osdc = osd->o_osdc;
81 WARN_ON(!(mutex_is_locked(&osd->lock) &&
92 static inline void verify_osd_locked(struct ceph_osd *osd) { }
101 * fill osd op in request message.
455 t->osd = CEPH_HOMELESS_OSD;
484 dest->osd = src->osd;
736 * This is an osd op init function for opcodes that have no data or
1056 pr_err("unsupported osd opcode %s\n",
1187 * We keep osd requests in an rbtree, sorted by ->r_tid.
1202 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1204 for (p = rb_first(&osd->o_requests); p; ) {
1224 static bool osd_homeless(struct ceph_osd *osd)
1226 return osd->o_osd == CEPH_HOMELESS_OSD;
1229 static bool osd_registered(struct ceph_osd *osd)
1231 verify_osdc_locked(osd->o_osdc);
1233 return !RB_EMPTY_NODE(&osd->o_node);
1237 * Assumes @osd is zero-initialized.
1239 static void osd_init(struct ceph_osd *osd)
1241 refcount_set(&osd->o_ref, 1);
1242 RB_CLEAR_NODE(&osd->o_node);
1243 spin_lock_init(&osd->o_requests_lock);
1244 osd->o_requests = RB_ROOT;
1245 osd->o_linger_requests = RB_ROOT;
1246 osd->o_backoff_mappings = RB_ROOT;
1247 osd->o_backoffs_by_id = RB_ROOT;
1248 INIT_LIST_HEAD(&osd->o_osd_lru);
1249 INIT_LIST_HEAD(&osd->o_keepalive_item);
1250 osd->o_incarnation = 1;
1251 mutex_init(&osd->lock);
1261 static void osd_cleanup(struct ceph_osd *osd)
1263 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1264 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1265 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1266 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1267 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1268 WARN_ON(!list_empty(&osd->o_osd_lru));
1269 WARN_ON(!list_empty(&osd->o_keepalive_item));
1271 ceph_init_sparse_read(&osd->o_sparse_read);
1273 if (osd->o_auth.authorizer) {
1274 WARN_ON(osd_homeless(osd));
1275 ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1284 struct ceph_osd *osd;
1288 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1289 osd_init(osd);
1290 osd->o_osdc = osdc;
1291 osd->o_osd = onum;
1292 osd->o_sparse_op_idx = -1;
1294 ceph_init_sparse_read(&osd->o_sparse_read);
1296 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1298 return osd;
1301 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1303 if (refcount_inc_not_zero(&osd->o_ref)) {
1304 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1305 refcount_read(&osd->o_ref));
1306 return osd;
1308 dout("get_osd %p FAIL\n", osd);
1313 static void put_osd(struct ceph_osd *osd)
1315 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1316 refcount_read(&osd->o_ref) - 1);
1317 if (refcount_dec_and_test(&osd->o_ref)) {
1318 osd_cleanup(osd);
1319 kfree(osd);
1323 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1325 static void __move_osd_to_lru(struct ceph_osd *osd)
1327 struct ceph_osd_client *osdc = osd->o_osdc;
1329 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1330 BUG_ON(!list_empty(&osd->o_osd_lru));
1333 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1336 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1339 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1341 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1342 RB_EMPTY_ROOT(&osd->o_linger_requests))
1343 __move_osd_to_lru(osd);
1346 static void __remove_osd_from_lru(struct ceph_osd *osd)
1348 struct ceph_osd_client *osdc = osd->o_osdc;
1350 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1353 if (!list_empty(&osd->o_osd_lru))
1354 list_del_init(&osd->o_osd_lru);
1362 static void close_osd(struct ceph_osd *osd)
1364 struct ceph_osd_client *osdc = osd->o_osdc;
1368 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1370 ceph_con_close(&osd->o_con);
1372 for (n = rb_first(&osd->o_requests); n; ) {
1379 unlink_request(osd, req);
1382 for (n = rb_first(&osd->o_linger_requests); n; ) {
1390 unlink_linger(osd, lreq);
1393 clear_backoffs(osd);
1395 __remove_osd_from_lru(osd);
1396 erase_osd(&osdc->osds, osd);
1397 put_osd(osd);
1401 * reset osd connect
1403 static int reopen_osd(struct ceph_osd *osd)
1407 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1409 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1410 RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1411 close_osd(osd);
1415 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1416 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1417 !ceph_con_opened(&osd->o_con)) {
1420 dout("osd addr hasn't changed and connection never opened, "
1423 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1432 ceph_con_close(&osd->o_con);
1433 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1434 osd->o_incarnation++;
1442 struct ceph_osd *osd;
1450 osd = lookup_osd(&osdc->osds, o);
1452 osd = &osdc->homeless_osd;
1453 if (!osd) {
1457 osd = create_osd(osdc, o);
1458 insert_osd(&osdc->osds, osd);
1459 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1460 &osdc->osdmap->osd_addr[osd->o_osd]);
1463 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1464 return osd;
1470 * @req has to be assigned a tid, @osd may be homeless.
1472 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1474 verify_osd_locked(osd);
1476 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1479 if (!osd_homeless(osd))
1480 __remove_osd_from_lru(osd);
1482 atomic_inc(&osd->o_osdc->num_homeless);
1484 get_osd(osd);
1485 spin_lock(&osd->o_requests_lock);
1486 insert_request(&osd->o_requests, req);
1487 spin_unlock(&osd->o_requests_lock);
1488 req->r_osd = osd;
1491 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1493 verify_osd_locked(osd);
1494 WARN_ON(req->r_osd != osd);
1495 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1499 spin_lock(&osd->o_requests_lock);
1500 erase_request(&osd->o_requests, req);
1501 spin_unlock(&osd->o_requests_lock);
1502 put_osd(osd);
1504 if (!osd_homeless(osd))
1505 maybe_move_osd_to_lru(osd);
1507 atomic_dec(&osd->o_osdc->num_homeless);
1564 dout("%s picked osd%d, primary osd%d\n", __func__,
1594 dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1626 t->osd = CEPH_HOMELESS_OSD;
1651 t->osd = CEPH_HOMELESS_OSD;
1714 t->osd = acting.osds[pos];
1717 t->osd = acting.primary;
1728 dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1729 legacy_change, force_resend, split, ct_res, t->osd);
1988 static void clear_backoffs(struct ceph_osd *osd)
1990 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1992 rb_entry(rb_first(&osd->o_backoff_mappings),
2001 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
2004 erase_spg_mapping(&osd->o_backoff_mappings, spg);
2035 struct ceph_osd *osd = req->r_osd;
2040 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
2049 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
2050 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
2335 struct ceph_osd *osd = req->r_osd;
2337 verify_osd_locked(osd);
2338 WARN_ON(osd->o_osd != req->r_t.osd);
2359 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2362 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2369 req->r_sent = osd->o_incarnation;
2371 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2401 struct ceph_osd *osd;
2415 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2416 if (IS_ERR(osd)) {
2417 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2456 } else if (!osd_homeless(osd)) {
2462 mutex_lock(&osd->lock);
2469 link_request(osd, req);
2474 mutex_unlock(&osd->lock);
2794 WARN_ON(lreq->osd);
2853 * @lreq has to be registered, @osd may be homeless.
2855 static void link_linger(struct ceph_osd *osd,
2858 verify_osd_locked(osd);
2859 WARN_ON(!lreq->linger_id || lreq->osd);
2860 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2861 osd->o_osd, lreq, lreq->linger_id);
2863 if (!osd_homeless(osd))
2864 __remove_osd_from_lru(osd);
2866 atomic_inc(&osd->o_osdc->num_homeless);
2868 get_osd(osd);
2869 insert_linger(&osd->o_linger_requests, lreq);
2870 lreq->osd = osd;
2873 static void unlink_linger(struct ceph_osd *osd,
2876 verify_osd_locked(osd);
2877 WARN_ON(lreq->osd != osd);
2878 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2879 osd->o_osd, lreq, lreq->linger_id);
2881 lreq->osd = NULL;
2882 erase_linger(&osd->o_linger_requests, lreq);
2883 put_osd(osd);
2885 if (!osd_homeless(osd))
2886 maybe_move_osd_to_lru(osd);
2888 atomic_dec(&osd->o_osdc->num_homeless);
3271 link_request(lreq->osd, req);
3278 struct ceph_osd *osd;
3284 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3285 link_linger(osd, lreq);
3318 unlink_linger(lreq->osd, lreq);
3462 * a connection with that osd (from the fault callback).
3465 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3468 for (p = rb_first(&osd->o_requests); p; ) {
3475 dout(" req %p tid %llu on osd%d is laggy\n",
3476 req, req->r_tid, osd->o_osd);
3481 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3482 req->r_tid, osd->o_osd);
3486 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3490 dout(" lreq %p linger_id %llu is served by osd%d\n",
3491 lreq, lreq->linger_id, osd->o_osd);
3501 list_move_tail(&osd->o_keepalive_item, &slow_osds);
3512 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3523 struct ceph_osd *osd = list_first_entry(&slow_osds,
3526 list_del_init(&osd->o_keepalive_item);
3527 ceph_con_keepalive(&osd->o_con);
3541 struct ceph_osd *osd, *nosd;
3545 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3546 if (time_before(jiffies, osd->lru_ttl))
3549 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3550 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3551 close_osd(osd);
3766 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3768 struct ceph_osd_client *osdc = osd->o_osdc;
3779 if (!osd_registered(osd)) {
3780 dout("%s osd%d unknown\n", __func__, osd->o_osd);
3783 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3785 mutex_lock(&osd->lock);
3786 req = lookup_request(&osd->o_requests, tid);
3788 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3820 unlink_request(osd, req);
3821 mutex_unlock(&osd->lock);
3838 unlink_request(osd, req);
3839 mutex_unlock(&osd->lock);
3884 mutex_unlock(&osd->lock);
3893 mutex_unlock(&osd->lock);
3929 struct ceph_osd *osd;
3931 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3932 if (osd != lreq->osd) {
3933 unlink_linger(lreq->osd, lreq);
3934 link_linger(osd, lreq);
3944 static void scan_requests(struct ceph_osd *osd,
3951 struct ceph_osd_client *osdc = osd->o_osdc;
3955 for (n = rb_first(&osd->o_linger_requests); n; ) {
3991 for (n = rb_first(&osd->o_requests); n; ) {
4013 unlink_request(osd, req);
4078 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4082 scan_requests(osd, skipped_map, was_full, true, need_resend,
4084 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
4085 memcmp(&osd->o_con.peer_addr,
4086 ceph_osd_addr(osdc->osdmap, osd->o_osd),
4088 close_osd(osd);
4121 struct ceph_osd *osd;
4126 osd = lookup_create_osd(osdc, req->r_t.osd, true);
4127 link_request(osd, req);
4129 if (!osd_homeless(osd) && !req->r_t.paused)
4137 if (!osd_homeless(lreq->osd))
4145 * Process updated osd map.
4262 * Resubmit requests pending on the given osd.
4264 static void kick_osd_requests(struct ceph_osd *osd)
4268 clear_backoffs(osd);
4270 for (n = rb_first(&osd->o_requests); n; ) {
4283 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4292 * If the osd connection drops, we need to resubmit all requests.
4296 struct ceph_osd *osd = con->private;
4297 struct ceph_osd_client *osdc = osd->o_osdc;
4299 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4302 if (!osd_registered(osd)) {
4303 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4307 if (!reopen_osd(osd))
4308 kick_osd_requests(osd);
4412 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4418 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4421 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4429 insert_spg_mapping(&osd->o_backoff_mappings, spg);
4445 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4456 ceph_con_send(&osd->o_con, msg);
4471 static void handle_backoff_unblock(struct ceph_osd *osd,
4478 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4481 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4483 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4484 __func__, osd->o_osd, m->spgid.pgid.pool,
4491 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4492 __func__, osd->o_osd, m->spgid.pgid.pool,
4497 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4501 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4505 erase_spg_mapping(&osd->o_backoff_mappings, spg);
4509 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4529 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4531 struct ceph_osd_client *osdc = osd->o_osdc;
4536 if (!osd_registered(osd)) {
4537 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4541 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4543 mutex_lock(&osd->lock);
4553 handle_backoff_block(osd, &m);
4556 handle_backoff_unblock(osd, &m);
4559 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4566 mutex_unlock(&osd->lock);
4571 * Process osd watch notifications
4743 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4745 mutex_lock(&osd->lock);
4746 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4757 mutex_unlock(&osd->lock);
4766 mutex_unlock(&osd->lock);
5227 * reset all osd connections
5235 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5238 if (!reopen_osd(osd))
5239 kick_osd_requests(osd);
5326 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5328 close_osd(osd);
5409 struct ceph_osd *osd = con->private;
5410 struct ceph_osd_client *osdc = osd->o_osdc;
5418 handle_reply(osd, msg);
5421 handle_backoff(osd, msg);
5462 struct ceph_osd *osd = con->private;
5463 struct ceph_osd_client *osdc = osd->o_osdc;
5472 if (!osd_registered(osd)) {
5473 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5477 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5479 mutex_lock(&osd->lock);
5480 req = lookup_request(&osd->o_requests, tid);
5482 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5483 osd->o_osd, tid);
5491 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5492 __func__, osd->o_osd, req->r_tid, front_len,
5504 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5505 __func__, osd->o_osd, req->r_tid, data_len,
5518 mutex_unlock(&osd->lock);
5555 struct ceph_osd *osd = con->private;
5567 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5568 osd->o_osd, type);
5579 struct ceph_osd *osd = con->private;
5580 if (get_osd(osd))
5587 struct ceph_osd *osd = con->private;
5588 put_osd(osd);