Skip to content

Commit 733a7c0

Browse files
committed
allow to use pool of ydb servers
1 parent 21a4270 commit 733a7c0

File tree

2 files changed

+115
-9
lines changed

2 files changed

+115
-9
lines changed

tools/greenplum-to-pg-tests/cmd/check_pg_queries.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func init() {
5656
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.sessionsLogNeedSort, "query-log-need-sort", false, "Sort query log in memory before start")
5757
must0(checkPgQueriesCmd.MarkPersistentFlagRequired("query-log"))
5858

59-
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.includeFailed, "include-failed", false, "Extract sessions with failed transactions")
59+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.includeFailed, "include-failed", true, "Extract sessions with failed transactions")
6060
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.ydbConnectionString, "ydb-connection", "grpc://localhost:2136/local", "Connection string to ydb server for check queries")
6161
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.limitRequests, "requests-limit", 0, "Limit number of parse requests, 0 mean unlimited")
6262
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.rulesFile, "rules-file", "issues.yaml", "Rules for detect issue. Set empty for skip read rules.")
@@ -106,10 +106,8 @@ var checkPgQueriesCmd = &cobra.Command{
106106

107107
log.Println("Connecting to ydb...")
108108
connectCtx, cancel := context.WithTimeout(ctx, time.Second*10)
109-
db := must(ydb.Open(
110-
connectCtx, checkPgQueriesConfig.ydbConnectionString,
111-
internal.GetYdbCredentials(),
112-
))
109+
connectionStrings := strings.Split(checkPgQueriesConfig.ydbConnectionString, ",")
110+
dbPool := internal.OpenYdbPool(connectCtx, connectionStrings, []ydb.Option{internal.GetYdbCredentials()})
113111
cancel()
114112

115113
var queries <-chan string
@@ -122,7 +120,7 @@ var checkPgQueriesCmd = &cobra.Command{
122120

123121
log.Println("Start check queries")
124122
var stats QueryStats
125-
checkQueries(rules, &stats, db, queries)
123+
checkQueries(rules, &stats, dbPool, queries)
126124

127125
if checkPgQueriesConfig.writeRulesWithStat != "" {
128126
rules.UpdateFromStats(stats, checkPgQueriesConfig.sortRulesByCount)
@@ -210,6 +208,9 @@ func readSortedQueries(reader io.ReadCloser) <-chan string {
210208
// pass
211209
}
212210
}
211+
if !item.TransactionSuccess && !checkPgQueriesConfig.includeFailed {
212+
continue
213+
}
213214

214215
queries <- item.Query
215216
counter++
@@ -345,6 +346,10 @@ func extractQueries(sessions []internal.Session) <-chan string {
345346
needRemoveLine := false
346347
for _, session := range sessions {
347348
for _, transaction := range session.Transactions {
349+
if !transaction.Success && !checkPgQueriesConfig.includeFailed {
350+
continue
351+
}
352+
348353
for _, pgQuery := range transaction.Queries {
349354
queryIndex++
350355
if queryIndex%checkPgQueriesConfig.printProgressEveryQueries == 0 {
@@ -371,7 +376,7 @@ func printDeleteLine() {
371376
fmt.Printf("\033[1A\033[K")
372377
}
373378

374-
func checkQueries(rules Rules, stats *QueryStats, db *ydb.Driver, queries <-chan string) {
379+
func checkQueries(rules Rules, stats *QueryStats, dbPool *internal.YdbPool, queries <-chan string) {
375380
if checkPgQueriesConfig.checkersCount < 1 {
376381
log.Fatalf("can't start less then 1 checker, got: %v", checkPgQueriesConfig.checkersCount)
377382
}
@@ -384,7 +389,7 @@ func checkQueries(rules Rules, stats *QueryStats, db *ydb.Driver, queries <-chan
384389
go func() {
385390
defer wg.Done()
386391
for q := range queries {
387-
checkQuery(stats, rules, db, q)
392+
checkQuery(stats, rules, dbPool, q)
388393
counter := itemsCounter.Add(1)
389394
if counter%writeStatEveryItems == 0 && checkPgQueriesConfig.writeStatPath != "" {
390395
if err := stats.SaveToFile(checkPgQueriesConfig.writeStatPath); err != nil {
@@ -405,7 +410,10 @@ const (
405410
checkResultErrUnknown
406411
)
407412

408-
func checkQuery(stat *QueryStats, rules Rules, db *ydb.Driver, queryText string) (reason string, checkResult checkResultType) {
413+
func checkQuery(stat *QueryStats, rules Rules, dbPool *internal.YdbPool, queryText string) (reason string, checkResult checkResultType) {
414+
db := dbPool.Get()
415+
defer dbPool.Release(db)
416+
409417
queryText = strings.TrimSpace(queryText)
410418
queryText = fixSchemaNames(queryText)
411419
queryText = fixCreateTable(queryText)
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"log"
6+
"sync"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3"
9+
)
10+
11+
type YdbPool struct {
12+
m sync.Mutex
13+
drivers []driverInflight
14+
}
15+
16+
type driverInflight struct {
17+
driver *ydb.Driver
18+
inflyght int
19+
}
20+
21+
func OpenYdbPool(ctx context.Context, connectionStrings []string, options []ydb.Option) *YdbPool {
22+
var m sync.Mutex
23+
drivers := make([]*ydb.Driver, 0, len(connectionStrings))
24+
25+
var wg sync.WaitGroup
26+
for i := range connectionStrings {
27+
cs := connectionStrings[i]
28+
wg.Add(1)
29+
go func() {
30+
defer wg.Done()
31+
32+
driver, err := ydb.Open(ctx, cs, options...)
33+
if err == nil {
34+
m.Lock()
35+
drivers = append(drivers, driver)
36+
m.Unlock()
37+
} else {
38+
log.Printf("Failed connection to %q: %v", cs, err)
39+
}
40+
}()
41+
}
42+
wg.Wait()
43+
44+
if len(drivers) == 0 {
45+
log.Fatalf("failed to connect for all endpoints")
46+
}
47+
48+
return NewYdbPool(drivers)
49+
}
50+
51+
func NewYdbPool(drivers []*ydb.Driver) *YdbPool {
52+
if len(drivers) == 0 {
53+
panic("can't create pool without drivers")
54+
}
55+
res := &YdbPool{
56+
drivers: make([]driverInflight, len(drivers)),
57+
}
58+
for i, driver := range drivers {
59+
res.drivers[i] = driverInflight{
60+
driver: driver,
61+
inflyght: 0,
62+
}
63+
}
64+
return res
65+
}
66+
67+
// Get return driver with minimal current inflight
68+
func (p *YdbPool) Get() *ydb.Driver {
69+
p.m.Lock()
70+
defer p.m.Unlock()
71+
72+
minInflyght := &p.drivers[0]
73+
for i := range p.drivers {
74+
if p.drivers[i].inflyght < minInflyght.inflyght {
75+
minInflyght = &p.drivers[i]
76+
}
77+
}
78+
79+
minInflyght.inflyght++
80+
return minInflyght.driver
81+
}
82+
83+
func (p *YdbPool) Release(driver *ydb.Driver) {
84+
p.m.Lock()
85+
defer p.m.Unlock()
86+
87+
for i := range p.drivers {
88+
if p.drivers[i].driver == driver {
89+
p.drivers[i].inflyght--
90+
if p.drivers[i].inflyght < 0 {
91+
panic("the ydb driver released more times, then get")
92+
}
93+
return
94+
}
95+
}
96+
97+
panic("the driver doesn't exists in the pool")
98+
}

0 commit comments

Comments
 (0)