@@ -22,6 +22,9 @@ class RocksDbDataSource(
22
22
23
23
private val logger = LoggerFactory .getLogger(" rocks-db" )
24
24
25
+ @ volatile
26
+ private var isClosed = false
27
+
25
28
/**
26
29
* This function obtains the associated value to a key, if there exists one.
27
30
*
@@ -30,6 +33,7 @@ class RocksDbDataSource(
30
33
* @return the value associated with the passed key.
31
34
*/
32
35
override def get (namespace : Namespace , key : Key ): Option [Value ] = {
36
+ assureNotClosed()
33
37
RocksDbDataSource .dbLock.readLock().lock()
34
38
try {
35
39
Option (db.get(handles(namespace), readOptions, key.toArray))
@@ -51,6 +55,7 @@ class RocksDbDataSource(
51
55
* @return the value associated with the passed key.
52
56
*/
53
57
override def getOptimized (key : Array [Byte ]): Option [Array [Byte ]] = {
58
+ assureNotClosed()
54
59
RocksDbDataSource .dbLock.readLock().lock()
55
60
try {
56
61
Option (db.get(readOptions, key))
@@ -73,6 +78,7 @@ class RocksDbDataSource(
73
78
* @return the new DataSource after the removals and insertions were done.
74
79
*/
75
80
override def update (namespace : Namespace , toRemove : Seq [Key ], toUpsert : Seq [(Key , Value )]): DataSource = {
81
+ assureNotClosed()
76
82
RocksDbDataSource .dbLock.readLock().lock()
77
83
try {
78
84
withResources(new WriteOptions ()){ writeOptions =>
@@ -104,6 +110,7 @@ class RocksDbDataSource(
104
110
* @return the new DataSource after the removals and insertions were done.
105
111
*/
106
112
override def updateOptimized (toRemove : Seq [Array [Byte ]], toUpsert : Seq [(Array [Byte ], Array [Byte ])]): DataSource = {
113
+ assureNotClosed()
107
114
RocksDbDataSource .dbLock.readLock().lock()
108
115
try {
109
116
withResources(new WriteOptions ()){ writeOptions =>
@@ -132,12 +139,16 @@ class RocksDbDataSource(
132
139
override def clear : DataSource = {
133
140
destroy()
134
141
logger.debug(s " About to create new DataSource for path: ${ rocksDbConfig.path }" )
135
- val (newDb, handles, readOptions, dbOptions, cfOptions) = RocksDbDataSource .createDB(rocksDbConfig, nameSpaces)
142
+ val (newDb, handles, readOptions, dbOptions, cfOptions) = RocksDbDataSource .createDB(rocksDbConfig, nameSpaces.tail)
143
+
144
+ assert(nameSpaces.size == handles.size)
145
+
136
146
this .db = newDb
137
147
this .readOptions = readOptions
138
- this .handles = nameSpaces.zip(handles).toMap
148
+ this .handles = nameSpaces.zip(handles.toList ).toMap
139
149
this .dbOptions = dbOptions
140
150
this .cfOptions = cfOptions
151
+ this .isClosed = false
141
152
this
142
153
}
143
154
@@ -146,6 +157,8 @@ class RocksDbDataSource(
146
157
*/
147
158
override def close (): Unit = {
148
159
logger.debug(s " About to close DataSource in path: ${ rocksDbConfig.path }" )
160
+ assureNotClosed()
161
+ isClosed = true
149
162
RocksDbDataSource .dbLock.writeLock().lock()
150
163
try {
151
164
// There is specific order for closing rocksdb with column families descibed in
@@ -172,7 +185,9 @@ class RocksDbDataSource(
172
185
*/
173
186
override def destroy (): Unit = {
174
187
try {
175
- close()
188
+ if (! isClosed) {
189
+ close()
190
+ }
176
191
} finally {
177
192
import rocksDbConfig ._
178
193
@@ -198,6 +213,13 @@ class RocksDbDataSource(
198
213
options.close()
199
214
}
200
215
}
216
+
217
+ private def assureNotClosed (): Unit = {
218
+ if (isClosed) {
219
+ throw new IllegalStateException (s " This ${getClass.getSimpleName} has been closed " )
220
+ }
221
+ }
222
+
201
223
}
202
224
203
225
trait RocksDbConfig {
@@ -273,6 +295,8 @@ object RocksDbDataSource {
273
295
val (db, handles, readOptions, dbOptions, cfOptions) = createDB(rocksDbConfig, namespaces)
274
296
assert(allNameSpaces.size == handles.size)
275
297
val handlesMap = allNameSpaces.zip(handles.toList).toMap
298
+ // This assert ensures that we do not have duplicated namespaces
299
+ assert(handlesMap.size == handles.size)
276
300
new RocksDbDataSource (db, rocksDbConfig, readOptions, dbOptions, cfOptions, allNameSpaces, handlesMap)
277
301
}
278
302
}
0 commit comments