Skip to content

Commit 1248206

Browse files
committed
Add a handler newtype wrapper so we don't need the annoying self: Arc<Self> everywhere
1 parent e8783f9 commit 1248206

File tree

2 files changed

+52
-45
lines changed

2 files changed

+52
-45
lines changed

src/net_protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl Default for GcState {
5555
#[derive(Debug)]
5656
pub struct Blobs<S> {
5757
rt: LocalPoolHandle,
58-
store: S,
58+
pub(crate) store: S,
5959
events: EventSender,
6060
downloader: Downloader,
6161
batches: tokio::sync::Mutex<BlobBatches>,

src/rpc.rs

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::{
44
io,
5+
ops::Deref,
56
sync::{Arc, Mutex},
67
};
78

@@ -81,15 +82,33 @@ impl<D: crate::store::Store> Blobs<D> {
8182
C: ChannelTypes<RpcService>,
8283
{
8384
use Request::*;
85+
let handler = Handler(self);
8486
match msg {
85-
Blobs(msg) => self.handle_blobs_request(msg, chan).await,
86-
Tags(msg) => self.handle_tags_request(msg, chan).await,
87+
Blobs(msg) => handler.handle_blobs_request(msg, chan).await,
88+
Tags(msg) => handler.handle_tags_request(msg, chan).await,
8789
}
8890
}
91+
}
92+
93+
#[derive(Clone)]
94+
struct Handler<S>(Arc<Blobs<S>>);
95+
96+
impl<S> Deref for Handler<S> {
97+
type Target = Blobs<S>;
98+
99+
fn deref(&self) -> &Self::Target {
100+
&self.0
101+
}
102+
}
103+
104+
impl<D: crate::store::Store> Handler<D> {
105+
fn store(&self) -> &D {
106+
&self.0.store
107+
}
89108

90109
/// Handle a tags request
91-
async fn handle_tags_request<C>(
92-
self: Arc<Self>,
110+
pub async fn handle_tags_request<C>(
111+
self,
93112
msg: proto::tags::Request,
94113
chan: RpcChannel<proto::RpcService, C>,
95114
) -> std::result::Result<(), RpcServerError<C>>
@@ -106,8 +125,8 @@ impl<D: crate::store::Store> Blobs<D> {
106125
}
107126

108127
/// Handle a blobs request
109-
async fn handle_blobs_request<C>(
110-
self: Arc<Self>,
128+
pub async fn handle_blobs_request<C>(
129+
self,
111130
msg: proto::blobs::Request,
112131
chan: RpcChannel<proto::RpcService, C>,
113132
) -> std::result::Result<(), RpcServerError<C>>
@@ -150,7 +169,7 @@ impl<D: crate::store::Store> Blobs<D> {
150169
}
151170
}
152171

153-
async fn blob_status(self: Arc<Self>, msg: BlobStatusRequest) -> RpcResult<BlobStatusResponse> {
172+
async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult<BlobStatusResponse> {
154173
let blobs = self;
155174
let entry = blobs
156175
.store()
@@ -171,7 +190,7 @@ impl<D: crate::store::Store> Blobs<D> {
171190
}))
172191
}
173192

174-
async fn blob_list_impl(self: Arc<Self>, co: &Co<RpcResult<BlobInfo>>) -> io::Result<()> {
193+
async fn blob_list_impl(self, co: &Co<RpcResult<BlobInfo>>) -> io::Result<()> {
175194
use bao_tree::io::fsm::Outboard;
176195

177196
let blobs = self;
@@ -190,7 +209,7 @@ impl<D: crate::store::Store> Blobs<D> {
190209
}
191210

192211
async fn blob_list_incomplete_impl(
193-
self: Arc<Self>,
212+
self,
194213
co: &Co<RpcResult<IncompleteBlobInfo>>,
195214
) -> io::Result<()> {
196215
let blobs = self;
@@ -216,7 +235,7 @@ impl<D: crate::store::Store> Blobs<D> {
216235
}
217236

218237
fn blob_list(
219-
self: Arc<Self>,
238+
self,
220239
_msg: ListRequest,
221240
) -> impl Stream<Item = RpcResult<BlobInfo>> + Send + 'static {
222241
Gen::new(|co| async move {
@@ -227,7 +246,7 @@ impl<D: crate::store::Store> Blobs<D> {
227246
}
228247

229248
fn blob_list_incomplete(
230-
self: Arc<Self>,
249+
self,
231250
_msg: ListIncompleteRequest,
232251
) -> impl Stream<Item = RpcResult<IncompleteBlobInfo>> + Send + 'static {
233252
Gen::new(move |co| async move {
@@ -237,26 +256,23 @@ impl<D: crate::store::Store> Blobs<D> {
237256
})
238257
}
239258

240-
async fn blob_delete_tag(self: Arc<Self>, msg: TagDeleteRequest) -> RpcResult<()> {
259+
async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> {
241260
self.store()
242261
.set_tag(msg.name, None)
243262
.await
244263
.map_err(|e| RpcError::new(&e))?;
245264
Ok(())
246265
}
247266

248-
async fn blob_delete_blob(self: Arc<Self>, msg: DeleteRequest) -> RpcResult<()> {
267+
async fn blob_delete_blob(self, msg: DeleteRequest) -> RpcResult<()> {
249268
self.store()
250269
.delete(vec![msg.hash])
251270
.await
252271
.map_err(|e| RpcError::new(&e))?;
253272
Ok(())
254273
}
255274

256-
fn blob_list_tags(
257-
self: Arc<Self>,
258-
msg: TagListRequest,
259-
) -> impl Stream<Item = TagInfo> + Send + 'static {
275+
fn blob_list_tags(self, msg: TagListRequest) -> impl Stream<Item = TagInfo> + Send + 'static {
260276
tracing::info!("blob_list_tags");
261277
let blobs = self;
262278
Gen::new(|co| async move {
@@ -274,7 +290,7 @@ impl<D: crate::store::Store> Blobs<D> {
274290

275291
/// Invoke validate on the database and stream out the result
276292
fn blob_validate(
277-
self: Arc<Self>,
293+
self,
278294
msg: ValidateRequest,
279295
) -> impl Stream<Item = ValidateProgress> + Send + 'static {
280296
let (tx, rx) = async_channel::bounded(1);
@@ -296,7 +312,7 @@ impl<D: crate::store::Store> Blobs<D> {
296312

297313
/// Invoke validate on the database and stream out the result
298314
fn blob_consistency_check(
299-
self: Arc<Self>,
315+
self,
300316
msg: ConsistencyCheckRequest,
301317
) -> impl Stream<Item = ConsistencyCheckProgress> + Send + 'static {
302318
let (tx, rx) = async_channel::bounded(1);
@@ -316,10 +332,7 @@ impl<D: crate::store::Store> Blobs<D> {
316332
rx
317333
}
318334

319-
fn blob_add_from_path(
320-
self: Arc<Self>,
321-
msg: AddPathRequest,
322-
) -> impl Stream<Item = AddPathResponse> {
335+
fn blob_add_from_path(self, msg: AddPathRequest) -> impl Stream<Item = AddPathResponse> {
323336
// provide a little buffer so that we don't slow down the sender
324337
let (tx, rx) = async_channel::bounded(32);
325338
let tx2 = tx.clone();
@@ -332,7 +345,7 @@ impl<D: crate::store::Store> Blobs<D> {
332345
rx.map(AddPathResponse)
333346
}
334347

335-
async fn tags_set(self: Arc<Self>, msg: TagsSetRequest) -> RpcResult<()> {
348+
async fn tags_set(self, msg: TagsSetRequest) -> RpcResult<()> {
336349
let blobs = self;
337350
blobs
338351
.store()
@@ -354,7 +367,7 @@ impl<D: crate::store::Store> Blobs<D> {
354367
Ok(())
355368
}
356369

357-
async fn tags_create(self: Arc<Self>, msg: TagsCreateRequest) -> RpcResult<Tag> {
370+
async fn tags_create(self, msg: TagsCreateRequest) -> RpcResult<Tag> {
358371
let blobs = self;
359372
let tag = blobs
360373
.store()
@@ -374,10 +387,7 @@ impl<D: crate::store::Store> Blobs<D> {
374387
Ok(tag)
375388
}
376389

377-
fn blob_download(
378-
self: Arc<Self>,
379-
msg: BlobDownloadRequest,
380-
) -> impl Stream<Item = DownloadResponse> {
390+
fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream<Item = DownloadResponse> {
381391
let (sender, receiver) = async_channel::bounded(1024);
382392
let endpoint = self.endpoint().clone();
383393
let progress = AsyncChannelProgressSender::new(sender);
@@ -399,7 +409,7 @@ impl<D: crate::store::Store> Blobs<D> {
399409
receiver.map(DownloadResponse)
400410
}
401411

402-
fn blob_export(self: Arc<Self>, msg: ExportRequest) -> impl Stream<Item = ExportResponse> {
412+
fn blob_export(self, msg: ExportRequest) -> impl Stream<Item = ExportResponse> {
403413
let (tx, rx) = async_channel::bounded(1024);
404414
let progress = AsyncChannelProgressSender::new(tx);
405415
let rt = self.rt().clone();
@@ -425,7 +435,7 @@ impl<D: crate::store::Store> Blobs<D> {
425435
}
426436

427437
async fn blob_add_from_path0(
428-
self: Arc<Self>,
438+
self,
429439
msg: AddPathRequest,
430440
progress: async_channel::Sender<AddProgress>,
431441
) -> anyhow::Result<()> {
@@ -543,18 +553,15 @@ impl<D: crate::store::Store> Blobs<D> {
543553
Ok(())
544554
}
545555

546-
async fn batch_create_temp_tag(
547-
self: Arc<Self>,
548-
msg: BatchCreateTempTagRequest,
549-
) -> RpcResult<()> {
556+
async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> {
550557
let blobs = self;
551558
let tag = blobs.store().temp_tag(msg.content);
552559
blobs.batches().await.store(msg.batch, tag);
553560
Ok(())
554561
}
555562

556563
fn batch_add_stream(
557-
self: Arc<Self>,
564+
self,
558565
msg: BatchAddStreamRequest,
559566
stream: impl Stream<Item = BatchAddStreamUpdate> + Send + Unpin + 'static,
560567
) -> impl Stream<Item = BatchAddStreamResponse> {
@@ -572,7 +579,7 @@ impl<D: crate::store::Store> Blobs<D> {
572579
}
573580

574581
fn batch_add_from_path(
575-
self: Arc<Self>,
582+
self,
576583
msg: BatchAddPathRequest,
577584
) -> impl Stream<Item = BatchAddPathResponse> {
578585
// provide a little buffer so that we don't slow down the sender
@@ -590,7 +597,7 @@ impl<D: crate::store::Store> Blobs<D> {
590597
}
591598

592599
async fn batch_add_stream0(
593-
self: Arc<Self>,
600+
self,
594601
msg: BatchAddStreamRequest,
595602
stream: impl Stream<Item = BatchAddStreamUpdate> + Send + Unpin + 'static,
596603
progress: async_channel::Sender<BatchAddStreamResponse>,
@@ -624,7 +631,7 @@ impl<D: crate::store::Store> Blobs<D> {
624631
}
625632

626633
async fn batch_add_from_path0(
627-
self: Arc<Self>,
634+
self,
628635
msg: BatchAddPathRequest,
629636
progress: async_channel::Sender<BatchAddPathProgress>,
630637
) -> anyhow::Result<()> {
@@ -664,7 +671,7 @@ impl<D: crate::store::Store> Blobs<D> {
664671
}
665672

666673
fn blob_add_stream(
667-
self: Arc<Self>,
674+
self,
668675
msg: AddStreamRequest,
669676
stream: impl Stream<Item = AddStreamUpdate> + Send + Unpin + 'static,
670677
) -> impl Stream<Item = AddStreamResponse> {
@@ -681,7 +688,7 @@ impl<D: crate::store::Store> Blobs<D> {
681688
}
682689

683690
async fn blob_add_stream0(
684-
self: Arc<Self>,
691+
self,
685692
msg: AddStreamRequest,
686693
stream: impl Stream<Item = AddStreamUpdate> + Send + Unpin + 'static,
687694
progress: async_channel::Sender<AddProgress>,
@@ -735,7 +742,7 @@ impl<D: crate::store::Store> Blobs<D> {
735742
}
736743

737744
fn blob_read_at(
738-
self: Arc<Self>,
745+
self,
739746
req: ReadAtRequest,
740747
) -> impl Stream<Item = RpcResult<ReadAtResponse>> + Send + 'static {
741748
let (tx, rx) = async_channel::bounded(RPC_BLOB_GET_CHANNEL_CAP);
@@ -816,7 +823,7 @@ impl<D: crate::store::Store> Blobs<D> {
816823
}
817824

818825
fn batch_create(
819-
self: Arc<Self>,
826+
self,
820827
_: BatchCreateRequest,
821828
mut updates: impl Stream<Item = BatchUpdate> + Send + Unpin + 'static,
822829
) -> impl Stream<Item = BatchCreateResponse> {
@@ -842,7 +849,7 @@ impl<D: crate::store::Store> Blobs<D> {
842849
}
843850

844851
async fn create_collection(
845-
self: Arc<Self>,
852+
self,
846853
req: CreateCollectionRequest,
847854
) -> RpcResult<CreateCollectionResponse> {
848855
let CreateCollectionRequest {

0 commit comments

Comments
 (0)