Skip to content

Commit 1042870

Browse files
authored
DataRow without allocation; DataRow as Collection; RowDescription top level (#198)
This is a cherry pick of #188. ### Modifications - `DataRow` and `RowDescription` have been moved out of the `PSQLBackendMessage` namespace. This allows us to mark them as `@inlinable` or `@usableFromInline` at a later point, without marking everything in `PSQLBackendMessage` as `@inlinable` - `DataRow` does not use an internal array for its columns anymore. Instead all read operations are directly done on its ByteBuffer slice. - `DataRow` implements the `Collection` protocol now. ### Result One allocation fewer per queried row.
1 parent 9967360 commit 1042870

19 files changed

+367
-193
lines changed

Sources/PostgresNIO/Connection/PostgresConnection+Database.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ extension PostgresConnection: PostgresDatabase {
5050
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
5151
return rows.all().map { allrows in
5252
let r = allrows.map { psqlRow -> PostgresRow in
53-
let columns = psqlRow.data.columns.map {
53+
let columns = psqlRow.data.map {
5454
PostgresMessage.DataRow.Column(value: $0)
5555
}
5656
return PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
@@ -112,7 +112,7 @@ extension PSQLRowStream {
112112

113113
func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
114114
self.onRow { psqlRow in
115-
let columns = psqlRow.data.columns.map {
115+
let columns = psqlRow.data.map {
116116
PostgresMessage.DataRow.Column(value: $0)
117117
}
118118

Sources/PostgresNIO/New/Connection State Machine/ConnectionStateMachine.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,18 @@ struct ConnectionStateMachine {
8787
case sendParseDescribeBindExecuteSync(query: String, binds: [PSQLEncodable])
8888
case sendBindExecuteSync(statementName: String, binds: [PSQLEncodable])
8989
case failQuery(ExtendedQueryContext, with: PSQLError, cleanupContext: CleanUpContext?)
90-
case succeedQuery(ExtendedQueryContext, columns: [PSQLBackendMessage.RowDescription.Column])
90+
case succeedQuery(ExtendedQueryContext, columns: [RowDescription.Column])
9191
case succeedQueryNoRowsComming(ExtendedQueryContext, commandTag: String)
9292

9393
// --- streaming actions
9494
// actions if query has requested next row but we are waiting for backend
95-
case forwardRows(CircularBuffer<PSQLBackendMessage.DataRow>)
96-
case forwardStreamComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
95+
case forwardRows([DataRow])
96+
case forwardStreamComplete([DataRow], commandTag: String)
9797
case forwardStreamError(PSQLError, read: Bool, cleanupContext: CleanUpContext?)
9898

9999
// Prepare statement actions
100100
case sendParseDescribeSync(name: String, query: String)
101-
case succeedPreparedStatementCreation(PrepareStatementContext, with: PSQLBackendMessage.RowDescription?)
101+
case succeedPreparedStatementCreation(PrepareStatementContext, with: RowDescription?)
102102
case failPreparedStatementCreation(PrepareStatementContext, with: PSQLError, cleanupContext: CleanUpContext?)
103103

104104
// Close actions
@@ -713,7 +713,7 @@ struct ConnectionStateMachine {
713713
}
714714
}
715715

716-
mutating func rowDescriptionReceived(_ description: PSQLBackendMessage.RowDescription) -> ConnectionAction {
716+
mutating func rowDescriptionReceived(_ description: RowDescription) -> ConnectionAction {
717717
switch self.state {
718718
case .extendedQuery(var queryState, let connectionContext) where !queryState.isComplete:
719719
return self.avoidingStateMachineCoW { machine -> ConnectionAction in
@@ -791,7 +791,7 @@ struct ConnectionStateMachine {
791791
}
792792
}
793793

794-
mutating func dataRowReceived(_ dataRow: PSQLBackendMessage.DataRow) -> ConnectionAction {
794+
mutating func dataRowReceived(_ dataRow: DataRow) -> ConnectionAction {
795795
guard case .extendedQuery(var queryState, let connectionContext) = self.state, !queryState.isComplete else {
796796
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.dataRow(dataRow)))
797797
}

Sources/PostgresNIO/New/Connection State Machine/ExtendedQueryStateMachine.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ struct ExtendedQueryStateMachine {
88

99
case parseCompleteReceived(ExtendedQueryContext)
1010
case parameterDescriptionReceived(ExtendedQueryContext)
11-
case rowDescriptionReceived(ExtendedQueryContext, [PSQLBackendMessage.RowDescription.Column])
11+
case rowDescriptionReceived(ExtendedQueryContext, [RowDescription.Column])
1212
case noDataMessageReceived(ExtendedQueryContext)
1313

1414
/// A state that is used if a noData message was received before. If a row description was received `bufferingRows` is
1515
/// used after receiving a `bindComplete` message
1616
case bindCompleteReceived(ExtendedQueryContext)
17-
case streaming([PSQLBackendMessage.RowDescription.Column], RowStreamStateMachine)
17+
case streaming([RowDescription.Column], RowStreamStateMachine)
1818

1919
case commandComplete(commandTag: String)
2020
case error(PSQLError)
@@ -28,13 +28,13 @@ struct ExtendedQueryStateMachine {
2828

2929
// --- general actions
3030
case failQuery(ExtendedQueryContext, with: PSQLError)
31-
case succeedQuery(ExtendedQueryContext, columns: [PSQLBackendMessage.RowDescription.Column])
31+
case succeedQuery(ExtendedQueryContext, columns: [RowDescription.Column])
3232
case succeedQueryNoRowsComming(ExtendedQueryContext, commandTag: String)
3333

3434
// --- streaming actions
3535
// actions if query has requested next row but we are waiting for backend
36-
case forwardRows(CircularBuffer<PSQLBackendMessage.DataRow>)
37-
case forwardStreamComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
36+
case forwardRows([DataRow])
37+
case forwardStreamComplete([DataRow], commandTag: String)
3838
case forwardStreamError(PSQLError, read: Bool)
3939

4040
case read
@@ -105,7 +105,7 @@ struct ExtendedQueryStateMachine {
105105
}
106106
}
107107

108-
mutating func rowDescriptionReceived(_ rowDescription: PSQLBackendMessage.RowDescription) -> Action {
108+
mutating func rowDescriptionReceived(_ rowDescription: RowDescription) -> Action {
109109
guard case .parameterDescriptionReceived(let queryContext) = self.state else {
110110
return self.setAndFireError(.unexpectedBackendMessage(.rowDescription(rowDescription)))
111111
}
@@ -119,7 +119,7 @@ struct ExtendedQueryStateMachine {
119119

120120
// In Postgres extended queries we always request the response rows to be returned in
121121
// `.binary` format.
122-
let columns = rowDescription.columns.map { column -> PSQLBackendMessage.RowDescription.Column in
122+
let columns = rowDescription.columns.map { column -> RowDescription.Column in
123123
var column = column
124124
column.format = .binary
125125
return column
@@ -155,12 +155,12 @@ struct ExtendedQueryStateMachine {
155155
}
156156
}
157157

158-
mutating func dataRowReceived(_ dataRow: PSQLBackendMessage.DataRow) -> Action {
158+
mutating func dataRowReceived(_ dataRow: DataRow) -> Action {
159159
switch self.state {
160160
case .streaming(let columns, var demandStateMachine):
161161
// When receiving a data row, we must ensure that the data row column count
162162
// matches the previously received row description column count.
163-
guard dataRow.columns.count == columns.count else {
163+
guard dataRow.columnCount == columns.count else {
164164
return self.setAndFireError(.unexpectedBackendMessage(.dataRow(dataRow)))
165165
}
166166

Sources/PostgresNIO/New/Connection State Machine/PrepareStatementStateMachine.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct PrepareStatementStateMachine {
1515

1616
enum Action {
1717
case sendParseDescribeSync(name: String, query: String)
18-
case succeedPreparedStatementCreation(PrepareStatementContext, with: PSQLBackendMessage.RowDescription?)
18+
case succeedPreparedStatementCreation(PrepareStatementContext, with: RowDescription?)
1919
case failPreparedStatementCreation(PrepareStatementContext, with: PSQLError)
2020

2121
case read
@@ -72,7 +72,7 @@ struct PrepareStatementStateMachine {
7272
return .succeedPreparedStatementCreation(queryContext, with: nil)
7373
}
7474

75-
mutating func rowDescriptionReceived(_ rowDescription: PSQLBackendMessage.RowDescription) -> Action {
75+
mutating func rowDescriptionReceived(_ rowDescription: RowDescription) -> Action {
7676
guard case .parameterDescriptionReceived(let queryContext) = self.state else {
7777
return self.setAndFireError(.unexpectedBackendMessage(.rowDescription(rowDescription)))
7878
}

Sources/PostgresNIO/New/Connection State Machine/RowStreamStateMachine.swift

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,28 @@ struct RowStreamStateMachine {
1313

1414
private enum State {
1515
/// The state machines expects further writes to `channelRead`. The writes are appended to the buffer.
16-
case waitingForRows(CircularBuffer<PSQLBackendMessage.DataRow>)
16+
case waitingForRows([DataRow])
1717
/// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is
1818
/// empty. It is preserved for performance reasons.
19-
case waitingForReadOrDemand(CircularBuffer<PSQLBackendMessage.DataRow>)
19+
case waitingForReadOrDemand([DataRow])
2020
/// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons.
21-
case waitingForRead(CircularBuffer<PSQLBackendMessage.DataRow>)
21+
case waitingForRead([DataRow])
2222
/// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is
2323
/// preserved for performance reasons.
24-
case waitingForDemand(CircularBuffer<PSQLBackendMessage.DataRow>)
24+
case waitingForDemand([DataRow])
2525

2626
case modifying
2727
}
2828

2929
private var state: State
3030

3131
init() {
32-
self.state = .waitingForRows(CircularBuffer(initialCapacity: 32))
32+
var buffer = [DataRow]()
33+
buffer.reserveCapacity(32)
34+
self.state = .waitingForRows(buffer)
3335
}
3436

35-
mutating func receivedRow(_ newRow: PSQLBackendMessage.DataRow) {
37+
mutating func receivedRow(_ newRow: DataRow) {
3638
switch self.state {
3739
case .waitingForRows(var buffer):
3840
self.state = .modifying
@@ -66,7 +68,7 @@ struct RowStreamStateMachine {
6668
}
6769
}
6870

69-
mutating func channelReadComplete() -> CircularBuffer<PSQLBackendMessage.DataRow>? {
71+
mutating func channelReadComplete() -> [DataRow]? {
7072
switch self.state {
7173
case .waitingForRows(let buffer):
7274
if buffer.isEmpty {
@@ -139,7 +141,7 @@ struct RowStreamStateMachine {
139141
}
140142
}
141143

142-
mutating func end() -> CircularBuffer<PSQLBackendMessage.DataRow> {
144+
mutating func end() -> [DataRow] {
143145
switch self.state {
144146
case .waitingForRows(let buffer):
145147
return buffer
Lines changed: 106 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,117 @@
11
import NIOCore
22

3-
extension PSQLBackendMessage {
3+
/// A backend data row message.
4+
///
5+
/// - NOTE: This struct is not part of the ``PSQLBackendMessage`` namespace even
6+
/// though this is where it actually belongs. The reason for this is, that we want
7+
/// this type to be @usableFromInline. If a type is made @usableFromInline in an
8+
/// enclosing type, the enclosing type must be @usableFromInline as well.
9+
/// Not putting `DataRow` in ``PSQLBackendMessage`` is our way to trick
10+
/// the Swift compiler
11+
struct DataRow: PSQLBackendMessage.PayloadDecodable, Equatable {
412

5-
struct DataRow: PayloadDecodable, Equatable {
6-
7-
var columns: [ByteBuffer?]
13+
var columnCount: Int16
14+
15+
var bytes: ByteBuffer
16+
17+
static func decode(from buffer: inout ByteBuffer) throws -> Self {
18+
try buffer.ensureAtLeastNBytesRemaining(2)
19+
let columnCount = buffer.readInteger(as: Int16.self)!
20+
let firstColumnIndex = buffer.readerIndex
821

9-
static func decode(from buffer: inout ByteBuffer) throws -> Self {
22+
for _ in 0..<columnCount {
1023
try buffer.ensureAtLeastNBytesRemaining(2)
11-
let columnCount = buffer.readInteger(as: Int16.self)!
24+
let bufferLength = Int(buffer.readInteger(as: Int32.self)!)
1225

13-
var result = [ByteBuffer?]()
14-
result.reserveCapacity(Int(columnCount))
15-
16-
for _ in 0..<columnCount {
17-
try buffer.ensureAtLeastNBytesRemaining(2)
18-
let bufferLength = Int(buffer.readInteger(as: Int32.self)!)
19-
20-
guard bufferLength >= 0 else {
21-
result.append(nil)
22-
continue
23-
}
24-
25-
try buffer.ensureAtLeastNBytesRemaining(bufferLength)
26-
let columnBuffer = buffer.readSlice(length: Int(bufferLength))!
27-
28-
result.append(columnBuffer)
26+
guard bufferLength >= 0 else {
27+
// if buffer length is negative, this means that the value is null
28+
continue
2929
}
3030

31-
return DataRow(columns: result)
31+
try buffer.ensureAtLeastNBytesRemaining(bufferLength)
32+
buffer.moveReaderIndex(forwardBy: bufferLength)
33+
}
34+
35+
try buffer.ensureExactNBytesRemaining(0)
36+
37+
buffer.moveReaderIndex(to: firstColumnIndex)
38+
let columnSlice = buffer.readSlice(length: buffer.readableBytes)!
39+
return DataRow(columnCount: columnCount, bytes: columnSlice)
40+
}
41+
}
42+
43+
extension DataRow: Sequence {
44+
typealias Element = ByteBuffer?
45+
46+
// There is no contiguous storage available... Sadly
47+
func withContiguousStorageIfAvailable<R>(_ body: (UnsafeBufferPointer<ByteBuffer?>) throws -> R) rethrows -> R? {
48+
nil
49+
}
50+
}
51+
52+
extension DataRow: Collection {
53+
54+
struct ColumnIndex: Comparable {
55+
var offset: Int
56+
57+
init(_ index: Int) {
58+
self.offset = index
59+
}
60+
61+
// Only needed implementation for comparable. The compiler synthesizes the rest from this.
62+
static func < (lhs: Self, rhs: Self) -> Bool {
63+
lhs.offset < rhs.offset
64+
}
65+
}
66+
67+
typealias Index = DataRow.ColumnIndex
68+
69+
var startIndex: ColumnIndex {
70+
ColumnIndex(self.bytes.readerIndex)
71+
}
72+
73+
var endIndex: ColumnIndex {
74+
ColumnIndex(self.bytes.readerIndex + self.bytes.readableBytes)
75+
}
76+
77+
var count: Int {
78+
Int(self.columnCount)
79+
}
80+
81+
func index(after index: ColumnIndex) -> ColumnIndex {
82+
guard index < self.endIndex else {
83+
preconditionFailure("index out of bounds")
84+
}
85+
var elementLength = Int(self.bytes.getInteger(at: index.offset, as: Int32.self)!)
86+
if elementLength < 0 {
87+
elementLength = 0
88+
}
89+
return ColumnIndex(index.offset + MemoryLayout<Int32>.size + elementLength)
90+
}
91+
92+
subscript(index: ColumnIndex) -> Element {
93+
guard index < self.endIndex else {
94+
preconditionFailure("index out of bounds")
95+
}
96+
let elementLength = Int(self.bytes.getInteger(at: index.offset, as: Int32.self)!)
97+
if elementLength < 0 {
98+
return nil
99+
}
100+
return self.bytes.getSlice(at: index.offset + MemoryLayout<Int32>.size, length: elementLength)!
101+
}
102+
}
103+
104+
extension DataRow {
105+
subscript(column index: Int) -> Element {
106+
guard index < self.columnCount else {
107+
preconditionFailure("index out of bounds")
32108
}
109+
110+
var byteIndex = self.startIndex
111+
for _ in 0..<index {
112+
byteIndex = self.index(after: byteIndex)
113+
}
114+
115+
return self[byteIndex]
33116
}
34117
}

0 commit comments

Comments
 (0)