1
1
package io .iohk .ethereum .db .dataSource
2
2
3
3
import java .util .concurrent .locks .ReentrantReadWriteLock
4
-
4
+ import io . iohk . ethereum . utils . Logger
5
5
import cats .effect .Resource
6
6
import io .iohk .ethereum .db .dataSource .DataSource ._
7
- import io .iohk .ethereum .db .dataSource .RocksDbDataSource .{ IterationError , IterationFinished }
7
+ import io .iohk .ethereum .db .dataSource .RocksDbDataSource ._
8
8
import io .iohk .ethereum .utils .TryWithResources .withResources
9
9
import monix .eval .Task
10
10
import monix .reactive .Observable
11
11
import org .rocksdb ._
12
- import org .slf4j .LoggerFactory
13
12
14
13
import scala .collection .mutable
15
14
import scala .util .control .NonFatal
@@ -22,9 +21,8 @@ class RocksDbDataSource(
22
21
private var cfOptions : ColumnFamilyOptions ,
23
22
private var nameSpaces : Seq [Namespace ],
24
23
private var handles : Map [Namespace , ColumnFamilyHandle ]
25
- ) extends DataSource {
26
-
27
- private val logger = LoggerFactory .getLogger(" rocks-db" )
24
+ ) extends DataSource
25
+ with Logger {
28
26
29
27
@ volatile
30
28
private var isClosed = false
@@ -37,16 +35,20 @@ class RocksDbDataSource(
37
35
* @return the value associated with the passed key.
38
36
*/
39
37
override def get (namespace : Namespace , key : Key ): Option [Value ] = {
40
- assureNotClosed()
41
- RocksDbDataSource .dbLock.readLock().lock()
38
+ dbLock.readLock().lock()
42
39
try {
40
+ assureNotClosed()
43
41
Option (db.get(handles(namespace), readOptions, key.toArray))
44
42
} catch {
45
- case NonFatal (e) =>
46
- logger.error(s " Not found associated value to a namespace: $namespace and a key: $key, cause: {} " , e.getMessage)
47
- throw new RuntimeException (e)
43
+ case error : RocksDbDataSourceClosedException =>
44
+ throw error
45
+ case NonFatal (error) =>
46
+ throw RocksDbDataSourceException (
47
+ s " Not found associated value to a namespace: $namespace and a key: $key" ,
48
+ error
49
+ )
48
50
} finally {
49
- RocksDbDataSource . dbLock.readLock().unlock()
51
+ dbLock.readLock().unlock()
50
52
}
51
53
}
52
54
@@ -59,23 +61,24 @@ class RocksDbDataSource(
59
61
* @return the value associated with the passed key.
60
62
*/
61
63
override def getOptimized (namespace : Namespace , key : Array [Byte ]): Option [Array [Byte ]] = {
62
- assureNotClosed()
63
- RocksDbDataSource .dbLock.readLock().lock()
64
+ dbLock.readLock().lock()
64
65
try {
66
+ assureNotClosed()
65
67
Option (db.get(handles(namespace), readOptions, key))
66
68
} catch {
67
- case NonFatal (e) =>
68
- logger.error(s " Not found associated value to a key: $key, cause: {} " , e.getMessage)
69
- throw new RuntimeException (e)
69
+ case error : RocksDbDataSourceClosedException =>
70
+ throw error
71
+ case NonFatal (error) =>
72
+ throw RocksDbDataSourceException (s " Not found associated value to a key: $key" , error)
70
73
} finally {
71
- RocksDbDataSource . dbLock.readLock().unlock()
74
+ dbLock.readLock().unlock()
72
75
}
73
76
}
74
77
75
78
override def update (dataSourceUpdates : Seq [DataUpdate ]): Unit = {
76
- assureNotClosed()
77
- RocksDbDataSource .dbLock.readLock().lock()
79
+ dbLock.writeLock().lock()
78
80
try {
81
+ assureNotClosed()
79
82
withResources(new WriteOptions ()) { writeOptions =>
80
83
withResources(new WriteBatch ()) { batch =>
81
84
dataSourceUpdates.foreach {
@@ -95,14 +98,12 @@ class RocksDbDataSource(
95
98
}
96
99
}
97
100
} catch {
98
- case NonFatal (e) =>
99
- logger.error(
100
- s " DataSource not updated, cause: {} " ,
101
- e.getMessage
102
- )
103
- throw new RuntimeException (e)
101
+ case error : RocksDbDataSourceClosedException =>
102
+ throw error
103
+ case NonFatal (error) =>
104
+ throw RocksDbDataSourceException (s " DataSource not updated " , error)
104
105
} finally {
105
- RocksDbDataSource . dbLock.readLock ().unlock()
106
+ dbLock.writeLock ().unlock()
106
107
}
107
108
}
108
109
@@ -139,12 +140,13 @@ class RocksDbDataSource(
139
140
}
140
141
141
142
/**
143
+ * This function is used only for tests.
142
144
* This function updates the DataSource by deleting all the (key-value) pairs in it.
143
145
*/
144
146
override def clear (): Unit = {
145
147
destroy()
146
- logger .debug(s " About to create new DataSource for path: ${rocksDbConfig.path}" )
147
- val (newDb, handles, readOptions, dbOptions, cfOptions) = RocksDbDataSource . createDB(rocksDbConfig, nameSpaces.tail)
148
+ log .debug(s " About to create new DataSource for path: ${rocksDbConfig.path}" )
149
+ val (newDb, handles, readOptions, dbOptions, cfOptions) = createDB(rocksDbConfig, nameSpaces.tail)
148
150
149
151
assert(nameSpaces.size == handles.size)
150
152
@@ -160,11 +162,11 @@ class RocksDbDataSource(
160
162
* This function closes the DataSource, without deleting the files used by it.
161
163
*/
162
164
override def close (): Unit = {
163
- logger.debug(s " About to close DataSource in path: ${rocksDbConfig.path}" )
164
- assureNotClosed()
165
- isClosed = true
166
- RocksDbDataSource .dbLock.writeLock().lock()
165
+ log.info(s " About to close DataSource in path: ${rocksDbConfig.path}" )
166
+ dbLock.writeLock().lock()
167
167
try {
168
+ assureNotClosed()
169
+ isClosed = true
168
170
// There is specific order for closing rocksdb with column families descibed in
169
171
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
170
172
// 1. Free all column families handles
@@ -175,16 +177,19 @@ class RocksDbDataSource(
175
177
dbOptions.close()
176
178
// 3. Free column families options
177
179
cfOptions.close()
180
+ log.info(s " DataSource closed successfully in the path: ${rocksDbConfig.path}" )
178
181
} catch {
179
- case NonFatal (e) =>
180
- logger.error(" Not closed the DataSource properly, cause: {}" , e)
181
- throw new RuntimeException (e)
182
+ case error : RocksDbDataSourceClosedException =>
183
+ throw error
184
+ case NonFatal (error) =>
185
+ throw RocksDbDataSourceException (s " Not closed the DataSource properly " , error)
182
186
} finally {
183
- RocksDbDataSource . dbLock.writeLock().unlock()
187
+ dbLock.writeLock().unlock()
184
188
}
185
189
}
186
190
187
191
/**
192
+ * This function is used only for tests.
188
193
* This function closes the DataSource, if it is not yet closed, and deletes all the files used by it.
189
194
*/
190
195
override def destroy (): Unit = {
@@ -193,8 +198,13 @@ class RocksDbDataSource(
193
198
close()
194
199
}
195
200
} finally {
196
- import rocksDbConfig ._
201
+ destroyDB()
202
+ }
203
+ }
197
204
205
+ protected def destroyDB (): Unit = {
206
+ try {
207
+ import rocksDbConfig ._
198
208
val tableCfg = new BlockBasedTableConfig ()
199
209
.setBlockSize(blockSize)
200
210
.setBlockCache(new ClockCache (blockCacheSize))
@@ -212,15 +222,18 @@ class RocksDbDataSource(
212
222
.setIncreaseParallelism(maxThreads)
213
223
.setTableFormatConfig(tableCfg)
214
224
215
- logger .debug(s " About to destroy DataSource in path: $path" )
225
+ log .debug(s " About to destroy DataSource in path: $path" )
216
226
RocksDB .destroyDB(path, options)
217
227
options.close()
228
+ } catch {
229
+ case NonFatal (error) =>
230
+ throw RocksDbDataSourceException (s " Not destroyed the DataSource properly " , error)
218
231
}
219
232
}
220
233
221
234
private def assureNotClosed (): Unit = {
222
235
if (isClosed) {
223
- throw new IllegalStateException (s " This ${getClass.getSimpleName} has been closed " )
236
+ throw RocksDbDataSourceClosedException (s " This ${getClass.getSimpleName} has been closed " )
224
237
}
225
238
}
226
239
@@ -242,6 +255,9 @@ object RocksDbDataSource {
242
255
case object IterationFinished extends RuntimeException
243
256
case class IterationError (ex : Throwable )
244
257
258
+ case class RocksDbDataSourceClosedException (message : String ) extends IllegalStateException (message)
259
+ case class RocksDbDataSourceException (message : String , cause : Throwable ) extends RuntimeException (message, cause)
260
+
245
261
/**
246
262
* The rocksdb implementation acquires a lock from the operating system to prevent misuse
247
263
*/
@@ -296,6 +312,9 @@ object RocksDbDataSource {
296
312
options,
297
313
cfOpts
298
314
)
315
+ } catch {
316
+ case NonFatal (error) =>
317
+ throw RocksDbDataSourceException (s " Not created the DataSource properly " , error)
299
318
} finally {
300
319
RocksDbDataSource .dbLock.writeLock().unlock()
301
320
}
0 commit comments