Skip to content

Commit 20c994a

Browse files
committed
implement file results so the buffer for the memory does not allocate other memory
1 parent 44defe1 commit 20c994a

File tree

3 files changed

+84
-14
lines changed

3 files changed

+84
-14
lines changed

chdb/driver/arrow.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package chdbdriver
33
import (
44
"database/sql/driver"
55
"fmt"
6+
"os"
67
"reflect"
78
"time"
89

@@ -19,6 +20,7 @@ type arrowRows struct {
1920
reader *ipc.FileReader
2021
curRecord arrow.Record
2122
curRow int64
23+
fd *os.File
2224
}
2325

2426
func (r *arrowRows) Columns() (out []string) {
@@ -37,6 +39,10 @@ func (r *arrowRows) Close() error {
3739
_ = r.reader.Close()
3840
r.reader = nil
3941
r.localResult = nil
42+
if r.fd != nil {
43+
_ = r.fd.Close()
44+
_ = os.Remove(r.fd.Name())
45+
}
4046
return nil
4147
}
4248

chdb/driver/driver.go

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ import (
66
"database/sql"
77
"database/sql/driver"
88
"fmt"
9+
"math/rand"
10+
"os"
911
"strconv"
1012
"strings"
13+
"time"
1114

1215
"github.com/chdb-io/chdb-go/chdb"
1316
"github.com/chdb-io/chdb-go/chdbstable"
@@ -34,6 +37,10 @@ const (
3437
defaultBufferSize = 512
3538
)
3639

40+
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
41+
42+
var seededRand *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano()))
43+
3744
func (d DriverType) String() string {
3845
switch d {
3946
case ARROW:
@@ -46,20 +53,51 @@ func (d DriverType) String() string {
4653
return ""
4754
}
4855

49-
func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufSize int, useUnsafe bool) (driver.Rows, error) {
56+
func (d DriverType) PrepareRows(result *chdbstable.LocalResult, buf []byte, bufSize int, useUnsafe bool, filePath string) (driver.Rows, error) {
5057
switch d {
5158
case ARROW:
52-
reader, err := ipc.NewFileReader(bytes.NewReader(buf))
53-
if err != nil {
54-
return nil, err
59+
var reader *ipc.FileReader
60+
var err error
61+
var fd *os.File
62+
if filePath != "" {
63+
fd, err = os.Open(filePath)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
reader, err = ipc.NewFileReader(fd)
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
} else {
74+
reader, err = ipc.NewFileReader(bytes.NewReader(buf))
75+
if err != nil {
76+
return nil, err
77+
}
5578
}
56-
return &arrowRows{localResult: result, reader: reader}, nil
79+
80+
return &arrowRows{localResult: result, reader: reader, fd: fd}, nil
5781
case PARQUET:
58-
reader := parquet.NewGenericReader[any](bytes.NewReader(buf))
82+
var reader *parquet.GenericReader[any]
83+
var fd *os.File
84+
if filePath != "" {
85+
fl, err := os.Open(filePath)
86+
if err != nil {
87+
return nil, err
88+
}
89+
fd = fl
90+
91+
reader = parquet.NewGenericReader[any](fl)
92+
} else {
93+
reader = parquet.NewGenericReader[any](bytes.NewReader(buf))
94+
}
95+
5996
return &parquetRows{
6097
localResult: result, reader: reader,
6198
bufferSize: bufSize, needNewBuffer: true,
6299
useUnsafeStringReader: useUnsafe,
100+
fd: fd,
63101
}, nil
64102
}
65103
return nil, fmt.Errorf("Unsupported driver type")
@@ -97,7 +135,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
97135
cc := &conn{
98136
udfPath: c.udfPath, session: c.session,
99137
driverType: c.driverType, bufferSize: c.bufferSize,
100-
useUnsafe: c.useUnsafe,
138+
useUnsafe: c.useUnsafe,
139+
useFileInsteadOfMemory: true,
101140
}
102141
cc.SetupQueryFun()
103142
return cc, nil
@@ -184,12 +223,13 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {
184223
}
185224

186225
type conn struct {
187-
udfPath string
188-
driverType DriverType
189-
bufferSize int
190-
useUnsafe bool
191-
session *chdb.Session
192-
QueryFun queryHandle
226+
udfPath string
227+
driverType DriverType
228+
bufferSize int
229+
useUnsafe bool
230+
useFileInsteadOfMemory bool
231+
session *chdb.Session
232+
QueryFun queryHandle
193233
}
194234

195235
func (c *conn) Close() error {
@@ -230,14 +270,31 @@ func (c *conn) compileArguments(query string, args []driver.NamedValue) (string,
230270
} else {
231271
compiledQuery = query
232272
}
273+
233274
return compiledQuery, nil
234275
}
235276

277+
func (c *conn) createRandomFilePath(size int) string {
278+
b := make([]byte, size)
279+
for i := range b {
280+
b[i] = charset[seededRand.Intn(len(charset))]
281+
}
282+
return string(b)
283+
284+
}
285+
236286
func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
237287
compiledQuery, err := c.compileArguments(query, args)
238288
if err != nil {
239289
return nil, err
240290
}
291+
var filePath string
292+
if c.useFileInsteadOfMemory {
293+
compiledQuery = strings.TrimSuffix(compiledQuery, ";")
294+
compiledQuery += " INTO OUTFILE "
295+
filePath = fmt.Sprintf("/tmp/%s.%s", c.createRandomFilePath(16), strings.ToLower(c.driverType.String()))
296+
compiledQuery += fmt.Sprintf("'%s'", filePath)
297+
}
241298
result, err := c.QueryFun(compiledQuery, c.driverType.String(), c.udfPath)
242299
if err != nil {
243300
return nil, err
@@ -247,7 +304,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
247304
if buf == nil {
248305
return nil, fmt.Errorf("result is nil")
249306
}
250-
return c.driverType.PrepareRows(result, buf, c.bufferSize, c.useUnsafe)
307+
return c.driverType.PrepareRows(result, buf, c.bufferSize, c.useUnsafe, filePath)
251308

252309
}
253310

chdb/driver/parquet.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"database/sql/driver"
55
"fmt"
66
"io"
7+
"os"
78
"time"
89
"unsafe"
910

@@ -33,6 +34,7 @@ type parquetRows struct {
3334
curRow int64 // row counter
3435
needNewBuffer bool
3536
useUnsafeStringReader bool
37+
fd *os.File
3638
}
3739

3840
func (r *parquetRows) Columns() (out []string) {
@@ -53,6 +55,11 @@ func (r *parquetRows) Close() error {
5355
r.reader = nil
5456
r.localResult = nil
5557
r.buffer = nil
58+
if r.fd != nil {
59+
r.fd.Close()
60+
os.Remove(r.fd.Name())
61+
r.fd = nil
62+
}
5663
return nil
5764
}
5865

0 commit comments

Comments
 (0)