Skip to content

Commit f3d6a72

Browse files
authored
Merge pull request #11 from agoncear-mwb/main
Added explicit key for using unsafe string reader
2 parents 82a5138 + 02f64ea commit f3d6a72

File tree

2 files changed

+47
-19
lines changed

2 files changed

+47
-19
lines changed

chdb/driver/driver.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ const (
2525
INVALID
2626
)
2727

28-
const sessionOptionKey = "session"
29-
const udfPathOptionKey = "udfPath"
30-
const driverTypeKey = "driverType"
31-
const driverBufferSizeKey = "bufferSize"
32-
33-
const defaultBufferSize = 512
28+
const (
29+
sessionOptionKey = "session"
30+
udfPathOptionKey = "udfPath"
31+
driverTypeKey = "driverType"
32+
useUnsafeStringReaderKey = "useUnsafeStringReader"
33+
driverBufferSizeKey = "bufferSize"
34+
defaultBufferSize = 512
35+
)
3436

3537
func (d DriverType) String() string {
3638
switch d {
@@ -44,7 +46,7 @@ func (d DriverType) String() string {
4446
return ""
4547
}
4648

47-
func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufSize int) (driver.Rows, error) {
49+
func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufSize int, useUnsafe bool) (driver.Rows, error) {
4850
switch d {
4951
case ARROW:
5052
reader, err := ipc.NewFileReader(bytes.NewReader(buf))
@@ -54,7 +56,11 @@ func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufS
5456
return &arrowRows{localResult: result, reader: reader}, nil
5557
case PARQUET:
5658
reader := parquet.NewGenericReader[any](bytes.NewReader(buf))
57-
return &parquetRows{localResult: result, reader: reader, bufferSize: bufSize, needNewBuffer: true}, nil
59+
return &parquetRows{
60+
localResult: result, reader: reader,
61+
bufferSize: bufSize, needNewBuffer: true,
62+
useUnsafeStringReader: useUnsafe,
63+
}, nil
5864
}
5965
return nil, fmt.Errorf("Unsupported driver type")
6066
}
@@ -79,6 +85,7 @@ type connector struct {
7985
udfPath string
8086
driverType DriverType
8187
bufferSize int
88+
useUnsafe bool
8289
session *chdb.Session
8390
}
8491

@@ -87,7 +94,11 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
8794
if c.driverType == INVALID {
8895
return nil, fmt.Errorf("DriverType not supported")
8996
}
90-
cc := &conn{udfPath: c.udfPath, session: c.session, driverType: c.driverType, bufferSize: c.bufferSize}
97+
cc := &conn{
98+
udfPath: c.udfPath, session: c.session,
99+
driverType: c.driverType, bufferSize: c.bufferSize,
100+
useUnsafe: c.useUnsafe,
101+
}
91102
cc.SetupQueryFun()
92103
return cc, nil
93104
}
@@ -138,6 +149,12 @@ func NewConnect(opts map[string]string) (ret *connector, err error) {
138149
} else {
139150
ret.bufferSize = defaultBufferSize
140151
}
152+
useUnsafe, ok := opts[useUnsafeStringReaderKey]
153+
if ok {
154+
if strings.ToLower(useUnsafe) == "true" {
155+
ret.useUnsafe = true
156+
}
157+
}
141158

142159
udfPath, ok := opts[udfPathOptionKey]
143160
if ok {
@@ -170,6 +187,7 @@ type conn struct {
170187
udfPath string
171188
driverType DriverType
172189
bufferSize int
190+
useUnsafe bool
173191
session *chdb.Session
174192
QueryFun queryHandle
175193
}
@@ -229,7 +247,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
229247
if buf == nil {
230248
return nil, fmt.Errorf("result is nil")
231249
}
232-
return c.driverType.PrepareRows(result, buf, c.bufferSize)
250+
return c.driverType.PrepareRows(result, buf, c.bufferSize, c.useUnsafe)
233251

234252
}
235253

chdb/driver/parquet.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/parquet-go/parquet-go"
1414
)
1515

16+
// NOTE: this function is strictly unsafe and can lead to undefined behavior if the underlying slice is going out of scope or if it is being modified while in use.
17+
// Use this function ONLY if you know that both of the conditions are respected and you need to allocate less memory possible.
1618
func bytesToString(data []byte) string {
1719
return *(*string)(unsafe.Pointer(&data))
1820
}
@@ -22,14 +24,15 @@ func getStringFromBytes(v parquet.Value) string {
2224
}
2325

2426
type parquetRows struct {
25-
localResult *chdbstable.LocalResult // result from clickhouse
26-
reader *parquet.GenericReader[any] // parquet reader
27-
curRecord parquet.Row // TODO: delete this?
28-
buffer []parquet.Row // record buffer
29-
bufferSize int // amount of records to preload into buffer
30-
bufferIndex int64 // index in the current buffer
31-
curRow int64 // row counter
32-
needNewBuffer bool
27+
localResult *chdbstable.LocalResult // result from clickhouse
28+
reader *parquet.GenericReader[any] // parquet reader
29+
curRecord parquet.Row // TODO: delete this?
30+
buffer []parquet.Row // record buffer
31+
bufferSize int // amount of records to preload into buffer
32+
bufferIndex int64 // index in the current buffer
33+
curRow int64 // row counter
34+
needNewBuffer bool
35+
useUnsafeStringReader bool
3336
}
3437

3538
func (r *parquetRows) Columns() (out []string) {
@@ -100,7 +103,14 @@ func (r *parquetRows) Next(dest []driver.Value) error {
100103
}
101104
switch r.ColumnTypeDatabaseTypeName(columnIndex) {
102105
case "STRING":
103-
dest[columnIndex] = getStringFromBytes(curVal)
106+
// we check if the user has initialized the connection with the unsafeStringReader parameter, and in that case we use `getStringFromBytes` method.
107+
// otherwise, we fallback to the traditional way and we allocate a new string
108+
if r.useUnsafeStringReader {
109+
dest[columnIndex] = getStringFromBytes(curVal)
110+
} else {
111+
dest[columnIndex] = string(curVal.ByteArray())
112+
}
113+
104114
case "INT8", "INT(8,true)":
105115
dest[columnIndex] = int8(curVal.Int32()) //check if this is correct
106116
case "INT16", "INT(16,true)":

0 commit comments

Comments
 (0)