@@ -1426,6 +1426,40 @@ static void __send_queued(struct ceph_osd_client *osdc)
1426
1426
__send_request (osdc , req );
1427
1427
}
1428
1428
1429
+ /*
1430
+ * Caller should hold map_sem for read and request_mutex.
1431
+ */
1432
+ static int __ceph_osdc_start_request (struct ceph_osd_client * osdc ,
1433
+ struct ceph_osd_request * req ,
1434
+ bool nofail )
1435
+ {
1436
+ int rc ;
1437
+
1438
+ __register_request (osdc , req );
1439
+ req -> r_sent = 0 ;
1440
+ req -> r_got_reply = 0 ;
1441
+ rc = __map_request (osdc , req , 0 );
1442
+ if (rc < 0 ) {
1443
+ if (nofail ) {
1444
+ dout ("osdc_start_request failed map, "
1445
+ " will retry %lld\n" , req -> r_tid );
1446
+ rc = 0 ;
1447
+ } else {
1448
+ __unregister_request (osdc , req );
1449
+ }
1450
+ return rc ;
1451
+ }
1452
+
1453
+ if (req -> r_osd == NULL ) {
1454
+ dout ("send_request %p no up osds in pg\n" , req );
1455
+ ceph_monc_request_next_osdmap (& osdc -> client -> monc );
1456
+ } else {
1457
+ __send_queued (osdc );
1458
+ }
1459
+
1460
+ return 0 ;
1461
+ }
1462
+
1429
1463
/*
1430
1464
* Timeout callback, called every N seconds when 1 or more osd
1431
1465
* requests has been active for more than N seconds. When this
@@ -1653,6 +1687,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1653
1687
osdmap_epoch = ceph_decode_32 (& p );
1654
1688
1655
1689
/* lookup */
1690
+ down_read (& osdc -> map_sem );
1656
1691
mutex_lock (& osdc -> request_mutex );
1657
1692
req = __lookup_request (osdc , tid );
1658
1693
if (req == NULL ) {
@@ -1709,7 +1744,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1709
1744
dout ("redirect pool %lld\n" , redir .oloc .pool );
1710
1745
1711
1746
__unregister_request (osdc , req );
1712
- mutex_unlock (& osdc -> request_mutex );
1713
1747
1714
1748
req -> r_target_oloc = redir .oloc ; /* struct */
1715
1749
@@ -1721,10 +1755,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1721
1755
* successfully. In the future we might want to follow
1722
1756
* original request's nofail setting here.
1723
1757
*/
1724
- err = ceph_osdc_start_request (osdc , req , true);
1758
+ err = __ceph_osdc_start_request (osdc , req , true);
1725
1759
BUG_ON (err );
1726
1760
1727
- goto done ;
1761
+ goto out_unlock ;
1728
1762
}
1729
1763
1730
1764
already_completed = req -> r_got_reply ;
@@ -1742,8 +1776,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1742
1776
req -> r_got_reply = 1 ;
1743
1777
} else if ((flags & CEPH_OSD_FLAG_ONDISK ) == 0 ) {
1744
1778
dout ("handle_reply tid %llu dup ack\n" , tid );
1745
- mutex_unlock (& osdc -> request_mutex );
1746
- goto done ;
1779
+ goto out_unlock ;
1747
1780
}
1748
1781
1749
1782
dout ("handle_reply tid %llu flags %d\n" , tid , flags );
@@ -1758,6 +1791,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1758
1791
__unregister_request (osdc , req );
1759
1792
1760
1793
mutex_unlock (& osdc -> request_mutex );
1794
+ up_read (& osdc -> map_sem );
1761
1795
1762
1796
if (!already_completed ) {
1763
1797
if (req -> r_unsafe_callback &&
@@ -1775,10 +1809,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1775
1809
complete_request (req );
1776
1810
}
1777
1811
1778
- done :
1812
+ out :
1779
1813
dout ("req=%p req->r_linger=%d\n" , req , req -> r_linger );
1780
1814
ceph_osdc_put_request (req );
1781
1815
return ;
1816
+ out_unlock :
1817
+ mutex_unlock (& osdc -> request_mutex );
1818
+ up_read (& osdc -> map_sem );
1819
+ goto out ;
1782
1820
1783
1821
bad_put :
1784
1822
req -> r_result = - EIO ;
@@ -1791,6 +1829,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1791
1829
ceph_osdc_put_request (req );
1792
1830
bad_mutex :
1793
1831
mutex_unlock (& osdc -> request_mutex );
1832
+ up_read (& osdc -> map_sem );
1794
1833
bad :
1795
1834
pr_err ("corrupt osd_op_reply got %d %d\n" ,
1796
1835
(int )msg -> front .iov_len , le32_to_cpu (msg -> hdr .front_len ));
@@ -2351,34 +2390,16 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2351
2390
struct ceph_osd_request * req ,
2352
2391
bool nofail )
2353
2392
{
2354
- int rc = 0 ;
2393
+ int rc ;
2355
2394
2356
2395
down_read (& osdc -> map_sem );
2357
2396
mutex_lock (& osdc -> request_mutex );
2358
- __register_request (osdc , req );
2359
- req -> r_sent = 0 ;
2360
- req -> r_got_reply = 0 ;
2361
- rc = __map_request (osdc , req , 0 );
2362
- if (rc < 0 ) {
2363
- if (nofail ) {
2364
- dout ("osdc_start_request failed map, "
2365
- " will retry %lld\n" , req -> r_tid );
2366
- rc = 0 ;
2367
- } else {
2368
- __unregister_request (osdc , req );
2369
- }
2370
- goto out_unlock ;
2371
- }
2372
- if (req -> r_osd == NULL ) {
2373
- dout ("send_request %p no up osds in pg\n" , req );
2374
- ceph_monc_request_next_osdmap (& osdc -> client -> monc );
2375
- } else {
2376
- __send_queued (osdc );
2377
- }
2378
- rc = 0 ;
2379
- out_unlock :
2397
+
2398
+ rc = __ceph_osdc_start_request (osdc , req , nofail );
2399
+
2380
2400
mutex_unlock (& osdc -> request_mutex );
2381
2401
up_read (& osdc -> map_sem );
2402
+
2382
2403
return rc ;
2383
2404
}
2384
2405
EXPORT_SYMBOL (ceph_osdc_start_request );
@@ -2504,9 +2525,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2504
2525
err = - ENOMEM ;
2505
2526
osdc -> notify_wq = create_singlethread_workqueue ("ceph-watch-notify" );
2506
2527
if (!osdc -> notify_wq )
2507
- goto out_msgpool ;
2528
+ goto out_msgpool_reply ;
2529
+
2508
2530
return 0 ;
2509
2531
2532
+ out_msgpool_reply :
2533
+ ceph_msgpool_destroy (& osdc -> msgpool_op_reply );
2510
2534
out_msgpool :
2511
2535
ceph_msgpool_destroy (& osdc -> msgpool_op );
2512
2536
out_mempool :
0 commit comments