Skip to content

Commit cf37b59

Browse files
committed
rxrpc: Move DATA transmission into call processor work item
Move DATA transmission into the call processor work item. In a future patch, this will be called from the I/O thread rather than being itsown work item. This will allow DATA transmission to be driven directly by incoming ACKs, pokes and timers as those are processed. The Tx queue is also split: The queue of packets prepared by sendmsg is now places in call->tx_sendmsg and the packet dispatcher decants the packets into call->tx_buffer as space becomes available in the transmission window. This allows sendmsg to run ahead of the available space to try and prevent an underflow in transmission. Signed-off-by: David Howells <[email protected]> cc: Marc Dionne <[email protected]> cc: [email protected]
1 parent f3441d4 commit cf37b59

File tree

7 files changed

+161
-80
lines changed

7 files changed

+161
-80
lines changed

include/trace/events/rxrpc.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@
183183
EM(rxrpc_call_queue_requeue, "QUE requeue ") \
184184
EM(rxrpc_call_queue_resend, "QUE resend ") \
185185
EM(rxrpc_call_queue_timer, "QUE timer ") \
186+
EM(rxrpc_call_queue_tx_data, "QUE tx-data ") \
186187
EM(rxrpc_call_see_accept, "SEE accept ") \
187188
EM(rxrpc_call_see_activate_client, "SEE act-clnt") \
188189
EM(rxrpc_call_see_connect_failed, "SEE con-fail") \
@@ -738,6 +739,7 @@ TRACE_EVENT(rxrpc_txqueue,
738739
__field(rxrpc_seq_t, acks_hard_ack )
739740
__field(rxrpc_seq_t, tx_bottom )
740741
__field(rxrpc_seq_t, tx_top )
742+
__field(rxrpc_seq_t, tx_prepared )
741743
__field(int, tx_winsize )
742744
),
743745

@@ -747,16 +749,18 @@ TRACE_EVENT(rxrpc_txqueue,
747749
__entry->acks_hard_ack = call->acks_hard_ack;
748750
__entry->tx_bottom = call->tx_bottom;
749751
__entry->tx_top = call->tx_top;
752+
__entry->tx_prepared = call->tx_prepared;
750753
__entry->tx_winsize = call->tx_winsize;
751754
),
752755

753-
TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u",
756+
TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u/%u",
754757
__entry->call,
755758
__print_symbolic(__entry->why, rxrpc_txqueue_traces),
756759
__entry->tx_bottom,
757760
__entry->acks_hard_ack,
758761
__entry->tx_top - __entry->tx_bottom,
759762
__entry->tx_top - __entry->acks_hard_ack,
763+
__entry->tx_prepared - __entry->tx_bottom,
760764
__entry->tx_winsize)
761765
);
762766

net/rxrpc/ar-internal.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,9 +646,11 @@ struct rxrpc_call {
646646

647647
/* Transmitted data tracking. */
648648
spinlock_t tx_lock; /* Transmit queue lock */
649+
struct list_head tx_sendmsg; /* Sendmsg prepared packets */
649650
struct list_head tx_buffer; /* Buffer of transmissible packets */
650651
rxrpc_seq_t tx_bottom; /* First packet in buffer */
651652
rxrpc_seq_t tx_transmitted; /* Highest packet transmitted */
653+
rxrpc_seq_t tx_prepared; /* Highest Tx slot prepared. */
652654
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
653655
u16 tx_backoff; /* Delay to insert due to Tx failure */
654656
u8 tx_winsize; /* Maximum size of Tx window */
@@ -766,7 +768,7 @@ struct rxrpc_send_params {
766768
*/
767769
struct rxrpc_txbuf {
768770
struct rcu_head rcu;
769-
struct list_head call_link; /* Link in call->tx_queue */
771+
struct list_head call_link; /* Link in call->tx_sendmsg/tx_buffer */
770772
struct list_head tx_link; /* Link in live Enc queue or Tx queue */
771773
struct rxrpc_call *call; /* Call to which belongs */
772774
ktime_t last_sent; /* Time at which last transmitted */
@@ -1067,6 +1069,7 @@ int rxrpc_send_abort_packet(struct rxrpc_call *);
10671069
int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
10681070
void rxrpc_reject_packets(struct rxrpc_local *);
10691071
void rxrpc_send_keepalive(struct rxrpc_peer *);
1072+
void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
10701073

10711074
/*
10721075
* peer_event.c

net/rxrpc/call_event.c

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,72 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
291291
_leave("");
292292
}
293293

294+
static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
295+
{
296+
unsigned int winsize = min_t(unsigned int, call->tx_winsize,
297+
call->cong_cwnd + call->cong_extra);
298+
rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize;
299+
rxrpc_seq_t tx_top = call->tx_top;
300+
int space;
301+
302+
space = wtop - tx_top;
303+
return space > 0;
304+
}
305+
306+
/*
307+
* Decant some if the sendmsg prepared queue into the transmission buffer.
308+
*/
309+
static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
310+
{
311+
struct rxrpc_txbuf *txb;
312+
313+
if (rxrpc_is_client_call(call) &&
314+
!test_bit(RXRPC_CALL_EXPOSED, &call->flags))
315+
rxrpc_expose_client_call(call);
316+
317+
while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
318+
struct rxrpc_txbuf, call_link))) {
319+
spin_lock(&call->tx_lock);
320+
list_del(&txb->call_link);
321+
spin_unlock(&call->tx_lock);
322+
323+
call->tx_top = txb->seq;
324+
list_add_tail(&txb->call_link, &call->tx_buffer);
325+
326+
rxrpc_transmit_one(call, txb);
327+
328+
// TODO: Drain the transmission buffers. Do this somewhere better
329+
if (after(call->acks_hard_ack, call->tx_bottom + 16))
330+
rxrpc_shrink_call_tx_buffer(call);
331+
332+
if (!rxrpc_tx_window_has_space(call))
333+
break;
334+
}
335+
}
336+
337+
static void rxrpc_transmit_some_data(struct rxrpc_call *call)
338+
{
339+
switch (call->state) {
340+
case RXRPC_CALL_SERVER_ACK_REQUEST:
341+
if (list_empty(&call->tx_sendmsg))
342+
return;
343+
fallthrough;
344+
345+
case RXRPC_CALL_SERVER_SEND_REPLY:
346+
case RXRPC_CALL_SERVER_AWAIT_ACK:
347+
case RXRPC_CALL_CLIENT_SEND_REQUEST:
348+
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
349+
if (!rxrpc_tx_window_has_space(call))
350+
return;
351+
if (list_empty(&call->tx_sendmsg))
352+
return;
353+
rxrpc_decant_prepared_tx(call);
354+
break;
355+
default:
356+
return;
357+
}
358+
}
359+
294360
/*
295361
* Handle retransmission and deferred ACK/abort generation.
296362
*/
@@ -309,19 +375,22 @@ void rxrpc_process_call(struct work_struct *work)
309375
call->debug_id, rxrpc_call_states[call->state], call->events);
310376

311377
recheck_state:
378+
if (call->acks_hard_ack != call->tx_bottom)
379+
rxrpc_shrink_call_tx_buffer(call);
380+
312381
/* Limit the number of times we do this before returning to the manager */
313-
iterations++;
314-
if (iterations > 5)
315-
goto requeue;
382+
if (!rxrpc_tx_window_has_space(call) ||
383+
list_empty(&call->tx_sendmsg)) {
384+
iterations++;
385+
if (iterations > 5)
386+
goto requeue;
387+
}
316388

317389
if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
318390
rxrpc_send_abort_packet(call);
319391
goto recheck_state;
320392
}
321393

322-
if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom)
323-
rxrpc_shrink_call_tx_buffer(call);
324-
325394
if (call->state == RXRPC_CALL_COMPLETE) {
326395
del_timer_sync(&call->timer);
327396
goto out;
@@ -387,6 +456,8 @@ void rxrpc_process_call(struct work_struct *work)
387456
set_bit(RXRPC_CALL_EV_RESEND, &call->events);
388457
}
389458

459+
rxrpc_transmit_some_data(call);
460+
390461
/* Process events */
391462
if (test_and_clear_bit(RXRPC_CALL_EV_EXPIRED, &call->events)) {
392463
if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&

net/rxrpc/call_object.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
156156
INIT_LIST_HEAD(&call->recvmsg_link);
157157
INIT_LIST_HEAD(&call->sock_link);
158158
INIT_LIST_HEAD(&call->attend_link);
159+
INIT_LIST_HEAD(&call->tx_sendmsg);
159160
INIT_LIST_HEAD(&call->tx_buffer);
160161
skb_queue_head_init(&call->recvmsg_queue);
161162
skb_queue_head_init(&call->rx_oos_queue);
@@ -641,6 +642,11 @@ static void rxrpc_destroy_call(struct work_struct *work)
641642
del_timer_sync(&call->timer);
642643

643644
rxrpc_cleanup_ring(call);
645+
while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
646+
struct rxrpc_txbuf, call_link))) {
647+
list_del(&txb->call_link);
648+
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
649+
}
644650
while ((txb = list_first_entry_or_null(&call->tx_buffer,
645651
struct rxrpc_txbuf, call_link))) {
646652
list_del(&txb->call_link);

net/rxrpc/output.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,14 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
465465

466466
trace_rxrpc_tx_data(call, txb->seq, serial, txb->wire.flags,
467467
test_bit(RXRPC_TXBUF_RESENT, &txb->flags), false);
468+
469+
/* Track what we've attempted to transmit at least once so that the
470+
* retransmission algorithm doesn't try to resend what we haven't sent
471+
* yet. However, this can race as we can receive an ACK before we get
472+
* to this point. But, OTOH, if we won't get an ACK mentioning this
473+
* packet unless the far side received it (though it could have
474+
* discarded it anyway and NAK'd it).
475+
*/
468476
cmpxchg(&call->tx_transmitted, txb->seq - 1, txb->seq);
469477

470478
/* send the packet with the don't fragment bit set if we currently
@@ -712,3 +720,43 @@ void rxrpc_send_keepalive(struct rxrpc_peer *peer)
712720
peer->last_tx_at = ktime_get_seconds();
713721
_leave("");
714722
}
723+
724+
/*
725+
* Schedule an instant Tx resend.
726+
*/
727+
static inline void rxrpc_instant_resend(struct rxrpc_call *call,
728+
struct rxrpc_txbuf *txb)
729+
{
730+
if (call->state < RXRPC_CALL_COMPLETE)
731+
kdebug("resend");
732+
}
733+
734+
/*
735+
* Transmit one packet.
736+
*/
737+
void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
738+
{
739+
int ret;
740+
741+
ret = rxrpc_send_data_packet(call, txb);
742+
if (ret < 0) {
743+
switch (ret) {
744+
case -ENETUNREACH:
745+
case -EHOSTUNREACH:
746+
case -ECONNREFUSED:
747+
rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
748+
0, ret);
749+
break;
750+
default:
751+
_debug("need instant resend %d", ret);
752+
rxrpc_instant_resend(call, txb);
753+
}
754+
} else {
755+
unsigned long now = jiffies;
756+
unsigned long resend_at = now + call->peer->rto_j;
757+
758+
WRITE_ONCE(call->resend_at, resend_at);
759+
rxrpc_reduce_call_timer(call, resend_at, now,
760+
rxrpc_timer_set_for_send);
761+
}
762+
}

net/rxrpc/sendmsg.c

Lines changed: 13 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,9 @@
2222
*/
2323
static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
2424
{
25-
unsigned int win_size;
26-
rxrpc_seq_t tx_win = smp_load_acquire(&call->acks_hard_ack);
27-
28-
/* If we haven't transmitted anything for >1RTT, we should reset the
29-
* congestion management state.
30-
*/
31-
if (ktime_before(ktime_add_us(call->tx_last_sent,
32-
call->peer->srtt_us >> 3),
33-
ktime_get_real())) {
34-
if (RXRPC_TX_SMSS > 2190)
35-
win_size = 2;
36-
else if (RXRPC_TX_SMSS > 1095)
37-
win_size = 3;
38-
else
39-
win_size = 4;
40-
win_size += call->cong_extra;
41-
} else {
42-
win_size = min_t(unsigned int, call->tx_winsize,
43-
call->cong_cwnd + call->cong_extra);
44-
}
45-
4625
if (_tx_win)
47-
*_tx_win = tx_win;
48-
return call->tx_top - tx_win < win_size;
26+
*_tx_win = call->tx_bottom;
27+
return call->tx_prepared - call->tx_bottom < 256;
4928
}
5029

5130
/*
@@ -66,11 +45,6 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
6645
if (signal_pending(current))
6746
return sock_intr_errno(*timeo);
6847

69-
if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
70-
rxrpc_shrink_call_tx_buffer(call);
71-
continue;
72-
}
73-
7448
trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
7549
*timeo = schedule_timeout(*timeo);
7650
}
@@ -107,11 +81,6 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
10781
tx_win == tx_start && signal_pending(current))
10882
return -EINTR;
10983

110-
if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
111-
rxrpc_shrink_call_tx_buffer(call);
112-
continue;
113-
}
114-
11584
if (tx_win != tx_start) {
11685
timeout = rtt;
11786
tx_start = tx_win;
@@ -137,11 +106,6 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
137106
if (call->state >= RXRPC_CALL_COMPLETE)
138107
return call->error;
139108

140-
if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
141-
rxrpc_shrink_call_tx_buffer(call);
142-
continue;
143-
}
144-
145109
trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
146110
*timeo = schedule_timeout(*timeo);
147111
}
@@ -207,29 +171,27 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
207171
unsigned long now;
208172
rxrpc_seq_t seq = txb->seq;
209173
bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
210-
int ret;
211174

212175
rxrpc_inc_stat(call->rxnet, stat_tx_data);
213176

214-
ASSERTCMP(seq, ==, call->tx_top + 1);
177+
ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
215178

216179
/* We have to set the timestamp before queueing as the retransmit
217180
* algorithm can see the packet as soon as we queue it.
218181
*/
219182
txb->last_sent = ktime_get_real();
220183

221-
/* Add the packet to the call's output buffer */
222-
rxrpc_get_txbuf(txb, rxrpc_txbuf_get_buffer);
223-
spin_lock(&call->tx_lock);
224-
list_add_tail(&txb->call_link, &call->tx_buffer);
225-
call->tx_top = seq;
226-
spin_unlock(&call->tx_lock);
227-
228184
if (last)
229185
trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
230186
else
231187
trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
232188

189+
/* Add the packet to the call's output buffer */
190+
spin_lock(&call->tx_lock);
191+
list_add_tail(&txb->call_link, &call->tx_sendmsg);
192+
call->tx_prepared = seq;
193+
spin_unlock(&call->tx_lock);
194+
233195
if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
234196
_debug("________awaiting reply/ACK__________");
235197
write_lock_bh(&call->state_lock);
@@ -258,30 +220,11 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
258220
write_unlock_bh(&call->state_lock);
259221
}
260222

261-
if (seq == 1 && rxrpc_is_client_call(call))
262-
rxrpc_expose_client_call(call);
263-
264-
ret = rxrpc_send_data_packet(call, txb);
265-
if (ret < 0) {
266-
switch (ret) {
267-
case -ENETUNREACH:
268-
case -EHOSTUNREACH:
269-
case -ECONNREFUSED:
270-
rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
271-
0, ret);
272-
goto out;
273-
}
274-
} else {
275-
unsigned long now = jiffies;
276-
unsigned long resend_at = now + call->peer->rto_j;
277223

278-
WRITE_ONCE(call->resend_at, resend_at);
279-
rxrpc_reduce_call_timer(call, resend_at, now,
280-
rxrpc_timer_set_for_send);
281-
}
282-
283-
out:
284-
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_trans);
224+
/* Stick the packet on the crypto queue or the transmission queue as
225+
* appropriate.
226+
*/
227+
rxrpc_queue_call(call, rxrpc_call_queue_tx_data);
285228
}
286229

287230
/*

0 commit comments

Comments
 (0)