Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 895112c

Browse files
tomusdrwrphmeier
authored andcommitted
Time-based transaction banning (#594) (#602)
* Allow replacing transactions. * Clear old transactions and ban them temporarily. * Move to a separate module and add some tests. * Add bound to banned transactions. * Remove unnecessary block and double PoolRotator.
1 parent c10e512 commit 895112c

File tree

3 files changed

+239
-19
lines changed

3 files changed

+239
-19
lines changed

substrate/extrinsic-pool/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod watcher;
3737
mod error;
3838
mod listener;
3939
mod pool;
40+
mod rotator;
4041

4142
pub use listener::Listener;
4243
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};

substrate/extrinsic-pool/src/pool.rs

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,21 @@
1414
// You should have received a copy of the GNU General Public License
1515
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
1616

17-
use std::{ collections::HashMap, fmt, sync::Arc, collections::BTreeMap};
17+
use std::{
18+
collections::{BTreeMap, HashMap},
19+
fmt,
20+
sync::Arc,
21+
time,
22+
};
1823
use futures::sync::mpsc;
1924
use parking_lot::{Mutex, RwLock};
2025
use serde::{Serialize, de::DeserializeOwned};
2126
use txpool::{self, Scoring, Readiness};
2227

28+
use error::IntoPoolError;
2329
use listener::Listener;
30+
use rotator::PoolRotator;
2431
use watcher::Watcher;
25-
use error::IntoPoolError;
2632

2733
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
2834

@@ -40,16 +46,18 @@ pub type AllExtrinsics<A> = BTreeMap<<<A as ChainApi>::VEx as txpool::VerifiedTr
4046

4147
/// Verified extrinsic struct. Wraps original extrinsic and verification info.
4248
#[derive(Debug)]
43-
pub struct Verified<Ex: ::std::fmt::Debug, VEx: txpool::VerifiedTransaction> {
49+
pub struct Verified<Ex, VEx> {
4450
/// Original extrinsic.
4551
pub original: Ex,
4652
/// Verification data.
4753
pub verified: VEx,
54+
/// Pool deadline, after it's reached we remove the extrinsic from the pool.
55+
pub valid_till: time::Instant,
4856
}
4957

50-
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
51-
where
52-
Ex: ::std::fmt::Debug,
58+
impl<Ex, VEx> txpool::VerifiedTransaction for Verified<Ex, VEx>
59+
where
60+
Ex: fmt::Debug,
5361
VEx: txpool::VerifiedTransaction,
5462
{
5563
type Hash = <VEx as txpool::VerifiedTransaction>::Hash;
@@ -118,10 +126,17 @@ pub struct Ready<'a, 'b, B: 'a + ChainApi> {
118126
api: &'a B,
119127
at: &'b BlockId<B::Block>,
120128
context: B::Ready,
129+
rotator: &'a PoolRotator<B::Hash>,
130+
now: time::Instant,
121131
}
122132

123133
impl<'a, 'b, B: ChainApi> txpool::Ready<VerifiedFor<B>> for Ready<'a, 'b, B> {
124134
fn is_ready(&mut self, xt: &VerifiedFor<B>) -> Readiness {
135+
if self.rotator.ban_if_stale(&self.now, xt) {
136+
debug!(target: "extrinsic-pool", "[{:?}] Banning as stale.", txpool::VerifiedTransaction::hash(xt));
137+
return Readiness::Stale;
138+
}
139+
125140
self.api.is_ready(self.at, &mut self.context, xt)
126141
}
127142
}
@@ -155,6 +170,11 @@ impl<T: ChainApi> Scoring<VerifiedFor<T>> for ScoringAdapter<T> {
155170
}
156171
}
157172

173+
/// Maximum time the transaction will be kept in the pool.
174+
///
175+
/// Transactions that don't get included within the limit are removed from the pool.
176+
const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
177+
158178
/// Extrinsics pool.
159179
pub struct Pool<B: ChainApi> {
160180
api: B,
@@ -164,6 +184,7 @@ pub struct Pool<B: ChainApi> {
164184
Listener<B::Hash>,
165185
>>,
166186
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
187+
rotator: PoolRotator<B::Hash>,
167188
}
168189

169190
impl<B: ChainApi> Pool<B> {
@@ -173,6 +194,7 @@ impl<B: ChainApi> Pool<B> {
173194
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
174195
import_notification_sinks: Default::default(),
175196
api,
197+
rotator: Default::default(),
176198
}
177199
}
178200

@@ -206,19 +228,28 @@ impl<B: ChainApi> Pool<B> {
206228
{
207229
xts
208230
.into_iter()
209-
.map(|xt| (self.api.verify_transaction(at, &xt), xt))
231+
.map(|xt| {
232+
match self.api.verify_transaction(at, &xt) {
233+
Ok(ref verified) if self.rotator.is_banned(txpool::VerifiedTransaction::hash(verified)) => {
234+
return (Err(txpool::Error::from("Temporarily Banned".to_owned()).into()), xt)
235+
},
236+
result => (result, xt),
237+
}
238+
})
210239
.map(|(v, xt)| {
211-
let xt = Verified { original: xt, verified: v? };
240+
let xt = Verified {
241+
original: xt,
242+
verified: v?,
243+
valid_till: time::Instant::now() + POOL_TIME,
244+
};
212245
Ok(self.pool.write().import(xt)?)
213246
})
214247
.collect()
215248
}
216249

217250
/// Imports one unverified extrinsic to the pool
218251
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Arc<VerifiedFor<B>>, B::Error> {
219-
let v = self.api.verify_transaction(at, &xt)?;
220-
let xt = Verified { original: xt, verified: v };
221-
Ok(self.pool.write().import(xt)?)
252+
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed"))
222253
}
223254

224255
/// Import a single extrinsic and starts to watch their progress in the pool.
@@ -244,7 +275,8 @@ impl<B: ChainApi> Pool<B> {
244275
senders: Option<&[<B::VEx as txpool::VerifiedTransaction>::Sender]>,
245276
) -> usize
246277
{
247-
let ready = Ready { api: &self.api, context: self.api.ready(), at };
278+
self.rotator.clear_timeouts(&time::Instant::now());
279+
let ready = self.ready(at);
248280
self.pool.write().cull(senders, ready)
249281
}
250282

@@ -284,9 +316,9 @@ impl<B: ChainApi> Pool<B> {
284316
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
285317
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
286318
{
287-
let ready = Ready { api: &self.api, context: self.api.ready(), at };
319+
let ready = self.ready(at);
288320
f(self.pool.read().pending(ready))
289-
}
321+
}
290322

291323
/// Retry to import all verified transactions from given sender.
292324
pub fn retry_verification(&self, at: &BlockId<B::Block>, sender: <B::VEx as txpool::VerifiedTransaction>::Sender) -> Result<(), B::Error> {
@@ -326,6 +358,16 @@ impl<B: ChainApi> Pool<B> {
326358
map
327359
})
328360
}
361+
362+
fn ready<'a, 'b>(&'a self, at: &'b BlockId<B::Block>) -> Ready<'a, 'b, B> {
363+
Ready {
364+
api: &self.api,
365+
rotator: &self.rotator,
366+
context: self.api.ready(),
367+
at,
368+
now: time::Instant::now(),
369+
}
370+
}
329371
}
330372

331373
/// A Readiness implementation that returns `Ready` for all transactions.
@@ -337,7 +379,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady {
337379
}
338380

339381
#[cfg(test)]
340-
mod tests {
382+
pub mod tests {
341383
use txpool;
342384
use super::{VerifiedFor, ExtrinsicFor};
343385
use std::collections::HashMap;
@@ -353,9 +395,9 @@ mod tests {
353395

354396
#[derive(Clone, Debug)]
355397
pub struct VerifiedTransaction {
356-
hash: Hash,
357-
sender: AccountId,
358-
nonce: u64,
398+
pub hash: Hash,
399+
pub sender: AccountId,
400+
pub nonce: u64,
359401
}
360402

361403
impl txpool::VerifiedTransaction for VerifiedTransaction {
@@ -419,7 +461,7 @@ mod tests {
419461

420462
result
421463
}
422-
464+
423465
fn ready(&self) -> Self::Ready {
424466
HashMap::default()
425467
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright 2018 Parity Technologies (UK) Ltd.
2+
// This file is part of Polkadot.
3+
4+
// Polkadot is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Polkadot is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
16+
17+
//! Rotate extrinsic inside the pool.
18+
//!
19+
//! Keeps only recent extrinsic and discard the ones kept for a significant amount of time.
20+
//! Discarded extrinsics are banned so that they don't get re-imported again.
21+
22+
use std::{
23+
collections::HashMap,
24+
fmt,
25+
hash,
26+
time::{Duration, Instant},
27+
};
28+
use parking_lot::RwLock;
29+
use txpool::VerifiedTransaction;
30+
use Verified;
31+
32+
/// Expected size of the banned extrinsics cache.
33+
const EXPECTED_SIZE: usize = 2048;
34+
35+
/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
36+
///
37+
/// Extrinsics that occupy the pool for too long are culled and temporarily banned from entering
38+
/// the pool again.
39+
pub struct PoolRotator<Hash> {
40+
/// How long the extrinsic is banned for.
41+
ban_time: Duration,
42+
/// Currently banned extrinsics.
43+
banned_until: RwLock<HashMap<Hash, Instant>>,
44+
}
45+
46+
impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
47+
fn default() -> Self {
48+
PoolRotator {
49+
ban_time: Duration::from_secs(60 * 30),
50+
banned_until: Default::default(),
51+
}
52+
}
53+
}
54+
55+
impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
56+
/// Returns `true` if extrinsic hash is currently banned.
57+
pub fn is_banned(&self, hash: &Hash) -> bool {
58+
self.banned_until.read().contains_key(hash)
59+
}
60+
61+
/// Bans extrinsic if it's stale.
62+
///
63+
/// Returns `true` if extrinsic is stale and got banned.
64+
pub fn ban_if_stale<Ex, VEx>(&self, now: &Instant, xt: &Verified<Ex, VEx>) -> bool where
65+
VEx: VerifiedTransaction<Hash=Hash>,
66+
Hash: fmt::Debug + fmt::LowerHex,
67+
{
68+
if &xt.valid_till > now {
69+
return false;
70+
}
71+
72+
let mut banned = self.banned_until.write();
73+
banned.insert(xt.verified.hash().clone(), *now + self.ban_time);
74+
75+
if banned.len() > 2 * EXPECTED_SIZE {
76+
while banned.len() > EXPECTED_SIZE {
77+
if let Some(key) = banned.keys().next().cloned() {
78+
banned.remove(&key);
79+
}
80+
}
81+
}
82+
83+
true
84+
}
85+
86+
/// Removes timed bans.
87+
pub fn clear_timeouts(&self, now: &Instant) {
88+
let mut banned = self.banned_until.write();
89+
90+
let to_remove = banned
91+
.iter()
92+
.filter_map(|(k, v)| if v < now {
93+
Some(k.clone())
94+
} else {
95+
None
96+
}).collect::<Vec<_>>();
97+
98+
for k in to_remove {
99+
banned.remove(&k);
100+
}
101+
}
102+
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use super::*;
107+
use pool::tests::VerifiedTransaction;
108+
use test_client::runtime::Hash;
109+
110+
fn rotator() -> PoolRotator<Hash> {
111+
PoolRotator {
112+
ban_time: Duration::from_millis(10),
113+
..Default::default()
114+
}
115+
}
116+
117+
fn tx() -> (Hash, Verified<u64, VerifiedTransaction>) {
118+
let hash = 5.into();
119+
let tx = Verified {
120+
original: 5,
121+
verified: VerifiedTransaction {
122+
hash,
123+
sender: Default::default(),
124+
nonce: Default::default(),
125+
},
126+
valid_till: Instant::now(),
127+
};
128+
129+
(hash, tx)
130+
}
131+
132+
#[test]
133+
fn should_not_ban_if_not_stale() {
134+
// given
135+
let (hash, tx) = tx();
136+
let rotator = rotator();
137+
assert!(!rotator.is_banned(&hash));
138+
let past = Instant::now() - Duration::from_millis(1000);
139+
140+
// when
141+
assert!(!rotator.ban_if_stale(&past, &tx));
142+
143+
// then
144+
assert!(!rotator.is_banned(&hash));
145+
}
146+
147+
#[test]
148+
fn should_ban_stale_extrinsic() {
149+
// given
150+
let (hash, tx) = tx();
151+
let rotator = rotator();
152+
assert!(!rotator.is_banned(&hash));
153+
154+
// when
155+
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
156+
157+
// then
158+
assert!(rotator.is_banned(&hash));
159+
}
160+
161+
162+
#[test]
163+
fn should_clear_banned() {
164+
// given
165+
let (hash, tx) = tx();
166+
let rotator = rotator();
167+
assert!(rotator.ban_if_stale(&Instant::now(), &tx));
168+
assert!(rotator.is_banned(&hash));
169+
170+
// when
171+
let future = Instant::now() + rotator.ban_time + rotator.ban_time;
172+
rotator.clear_timeouts(&future);
173+
174+
// then
175+
assert!(!rotator.is_banned(&hash));
176+
}
177+
}

0 commit comments

Comments
 (0)