Skip to content

Commit c7265a3

Browse files
committed
parallel execution
1 parent 99a397d commit c7265a3

File tree

4 files changed

+122
-107
lines changed

4 files changed

+122
-107
lines changed

tools/greenplum-to-pg-tests/cmd/extraxtSessions.go renamed to tools/greenplum-to-pg-tests/cmd/check_pg_queries.go

Lines changed: 110 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"regexp"
1616
"slices"
1717
"strings"
18+
"sync"
1819
"time"
1920

2021
"github.com/spf13/cobra"
@@ -25,7 +26,7 @@ import (
2526
"github.com/ydb-platform/postgres-compatibility-tests/tools/greenplum-to-pg-tests/internal"
2627
)
2728

28-
var extractSessionsConfig struct {
29+
var checkPgQueriesConfig struct {
2930
schemeDumpFile string
3031
sessionsLog string
3132
includeFailed bool
@@ -36,60 +37,58 @@ var extractSessionsConfig struct {
3637
sortRulesByCount bool
3738
printKnownIssues bool
3839
printQueryForKnownIssue bool
39-
filterReason string
40-
errorLimit int
4140
printErrorsInProgress bool
4241
printStats bool
4342
printProgressEveryQueries int
4443
writeStatPath string
44+
checkersCount int
4545
}
4646

4747
func init() {
48-
rootCmd.AddCommand(extractSessionsCmd)
49-
50-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.schemeDumpFile, "schemedump-file", "", "Path to dump of db schema. Set empty for skip read schema.")
51-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.sessionsLog, "query-log", "", "Set path to input sessions log")
52-
must0(extractSessionsCmd.MarkPersistentFlagRequired("query-log"))
53-
54-
extractSessionsCmd.PersistentFlags().BoolVar(&extractSessionsConfig.includeFailed, "include-failed", false, "Extract sessions with failed transactions")
55-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.ydbConnectionString, "ydb-connection", "grpc://localhost:2136/local", "Connection string to ydb server for check queries")
56-
extractSessionsCmd.PersistentFlags().IntVar(&extractSessionsConfig.limitRequests, "requests-limit", 1000, "Limit number of parse requests, 0 mean unlimited")
57-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.rulesFile, "rules-file", "issues.yaml", "Rules for detect issue. Set empty for skip read rules.")
58-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.writeRulesWithStat, "write-updated-rules", "", "Write rules with updated stats, may be same or other file as for rules-file")
59-
extractSessionsCmd.PersistentFlags().BoolVar(&extractSessionsConfig.sortRulesByCount, "sort-updates-rules-by-count", true, "")
60-
extractSessionsCmd.PersistentFlags().BoolVar(&extractSessionsConfig.printKnownIssues, "print-known-issues", false, "Print known issues instead of unknown")
61-
extractSessionsCmd.PersistentFlags().BoolVar(&extractSessionsConfig.printQueryForKnownIssue, "print-query-for-known-issues", true, "Print query for known issues")
62-
extractSessionsCmd.PersistentFlags().IntVar(&extractSessionsConfig.errorLimit, "print-errors-limit", 0, "Limit of printed errors. 0 mean infinite")
63-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.filterReason, "reason-filter", "", "Filter printer queries and reasons by regexp")
64-
extractSessionsCmd.PersistentFlags().BoolVar(&extractSessionsConfig.printErrorsInProgress, "print-progress", false, "Print queries in progress")
65-
extractSessionsCmd.PersistentFlags().BoolVar(&extractSessionsConfig.printStats, "print-stats", true, "Print queries in progress")
66-
extractSessionsCmd.PersistentFlags().IntVar(&extractSessionsConfig.printProgressEveryQueries, "print-progress-every-queries", 10, "Periodically print progress")
67-
extractSessionsCmd.PersistentFlags().StringVar(&extractSessionsConfig.writeStatPath, "write-stat-file", "", "Path to write full stat file if need. Will write example of queries")
48+
rootCmd.AddCommand(checkPgQueriesCmd)
49+
50+
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.schemeDumpFile, "schemedump-file", "", "Path to dump of db schema. Set empty for skip read schema.")
51+
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.sessionsLog, "query-log", "", "Set path to input sessions log")
52+
must0(checkPgQueriesCmd.MarkPersistentFlagRequired("query-log"))
53+
54+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.includeFailed, "include-failed", false, "Extract sessions with failed transactions")
55+
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.ydbConnectionString, "ydb-connection", "grpc://localhost:2136/local", "Connection string to ydb server for check queries")
56+
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.limitRequests, "requests-limit", 0, "Limit number of parse requests, 0 mean unlimited")
57+
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.rulesFile, "rules-file", "issues.yaml", "Rules for detect issue. Set empty for skip read rules.")
58+
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.writeRulesWithStat, "write-updated-rules", "issues_stat.yaml", "Write rules with updated stats, may be same or other file as for rules-file")
59+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.sortRulesByCount, "sort-updates-rules-by-count", true, "")
60+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printKnownIssues, "print-known-issues", false, "Print known issues instead of unknown")
61+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printQueryForKnownIssue, "print-query-for-known-issues", true, "Print query for known issues")
62+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printErrorsInProgress, "print-progress", false, "Print queries in progress")
63+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printStats, "print-stats", true, "Print queries in progress")
64+
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.printProgressEveryQueries, "print-progress-every-queries", 10, "Periodically print progress")
65+
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.writeStatPath, "write-stat-file", "", "Path to write full stat file if need. Will write example of queries")
66+
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.checkersCount, "check-queries-parallel", 5, "How many queries may be checked in parallel")
6867
}
6968

7069
// extraxtSessionsCmd represents the extraxtSessions command
71-
var extractSessionsCmd = &cobra.Command{
72-
Use: "extract-sessions",
70+
var checkPgQueriesCmd = &cobra.Command{
71+
Use: "check-pg-queries",
7372
Short: "Read session queryies log end extract sessions to files",
7473
Run: func(cmd *cobra.Command, args []string) {
7574
ctx := context.Background()
7675

7776
var rules Rules
78-
if extractSessionsConfig.rulesFile == "" {
77+
if checkPgQueriesConfig.rulesFile == "" {
7978
log.Println("Skip read rules file.")
8079
} else {
81-
log.Printf("Reading rules file %q...", extractSessionsConfig.rulesFile)
82-
if err := rules.LoadFromFile(extractSessionsConfig.rulesFile); err != nil {
80+
log.Printf("Reading rules file %q...", checkPgQueriesConfig.rulesFile)
81+
if err := rules.LoadFromFile(checkPgQueriesConfig.rulesFile); err != nil {
8382
log.Fatalf("Failed to read rules file: %v", err)
8483
}
8584
}
8685

8786
schema := internal.NewPgSchema()
88-
if extractSessionsConfig.schemeDumpFile == "" {
87+
if checkPgQueriesConfig.schemeDumpFile == "" {
8988
log.Println("Skip read session")
9089
} else {
9190
log.Println("Reading schema.. ")
92-
schemaFile, err := os.Open(extractSessionsConfig.schemeDumpFile)
91+
schemaFile, err := os.Open(checkPgQueriesConfig.schemeDumpFile)
9392
if err != nil {
9493
log.Fatalf("Failed to open scheme file")
9594
}
@@ -101,35 +100,42 @@ var extractSessionsCmd = &cobra.Command{
101100
log.Println("Connecting to ydb...")
102101
connectCtx, cancel := context.WithTimeout(ctx, time.Second*10)
103102
db := must(ydb.Open(
104-
connectCtx, extractSessionsConfig.ydbConnectionString,
103+
connectCtx, checkPgQueriesConfig.ydbConnectionString,
105104
internal.GetYdbCredentials(),
106105
))
107106
cancel()
108107

109108
sessions := readSessions()
109+
queries := extractQueries(sessions)
110110

111111
log.Println("Start check queries")
112112
var stats QueryStats
113-
checkQueries(rules, &stats, schema, db, sessions)
113+
checkQueries(rules, &stats, db, queries)
114114

115-
if extractSessionsConfig.writeRulesWithStat != "" {
116-
rules.UpdateFromStats(stats, extractSessionsConfig.sortRulesByCount)
117-
if err := rules.WriteToFile(extractSessionsConfig.writeRulesWithStat); err != nil {
115+
if checkPgQueriesConfig.writeRulesWithStat != "" {
116+
rules.UpdateFromStats(stats, checkPgQueriesConfig.sortRulesByCount)
117+
if err := rules.WriteToFile(checkPgQueriesConfig.writeRulesWithStat); err != nil {
118118
log.Printf("Failed to update rules stat: %v", err)
119119
}
120120
}
121+
122+
if checkPgQueriesConfig.writeStatPath != "" {
123+
if err := stats.SaveToFile(checkPgQueriesConfig.writeStatPath); err != nil {
124+
log.Printf("Failed to save stat file %q: %v", checkPgQueriesConfig.writeStatPath, err)
125+
}
126+
}
121127
},
122128
}
123129

124130
func readSessions() []internal.Session {
125-
reader := must(os.Open(extractSessionsConfig.sessionsLog))
131+
reader := must(os.Open(checkPgQueriesConfig.sessionsLog))
126132
defer reader.Close()
127133

128134
decoder := json.NewDecoder(reader)
129135

130136
sortedLogs := map[int]map[int]map[int]map[int]internal.SessionLogRecord{} // pid/session/transaction/query
131137

132-
limitCount := extractSessionsConfig.limitRequests
138+
limitCount := checkPgQueriesConfig.limitRequests
133139

134140
counter := 0
135141

@@ -215,67 +221,53 @@ readLoop:
215221
return res
216222
}
217223

218-
func checkQueries(rules Rules, stats *QueryStats, pgSchema *internal.PgSchema, db *ydb.Driver, sessions []internal.Session) {
219-
reasonFilter := regexp.MustCompile(extractSessionsConfig.filterReason)
220-
221-
errorLimit := extractSessionsConfig.errorLimit
222-
if errorLimit == 0 {
223-
errorLimit = math.MaxInt
224-
}
224+
func extractQueries(sessions []internal.Session) <-chan string {
225+
queries := make(chan string)
225226

226-
totalQueries := 0
227-
for _, session := range sessions {
228-
for _, transaction := range session.Transactions {
229-
totalQueries += len(transaction.Queries)
227+
go func() {
228+
totalQueries := 0
229+
for _, session := range sessions {
230+
for _, transaction := range session.Transactions {
231+
totalQueries += len(transaction.Queries)
232+
}
230233
}
231-
}
232-
233-
queryIndex := 0
234234

235-
for _, session := range sessions {
236-
for _, transaction := range session.Transactions {
237-
for _, pgQuery := range transaction.Queries {
238-
queryIndex++
239-
if queryIndex%extractSessionsConfig.printProgressEveryQueries == 0 {
240-
percent := float64(queryIndex) / float64(totalQueries) * 100
241-
log.Printf("Checking query %8d/%v (%v)", queryIndex, totalQueries, percent)
242-
}
243-
244-
reason, checkResult := checkQuery(stats, rules, db, pgQuery.Text)
245-
if !reasonFilter.MatchString(reason) {
246-
continue
247-
}
248-
if !extractSessionsConfig.printKnownIssues && checkResult == checkResultErrUnknown {
249-
if extractSessionsConfig.printErrorsInProgress {
250-
log.Printf("Reason: %v\nQuery:%v\n\n", reason, pgQuery.Text)
251-
}
252-
errorLimit--
253-
}
254-
if extractSessionsConfig.printKnownIssues && checkResult == checkResultErrKnown {
255-
if extractSessionsConfig.printErrorsInProgress {
256-
log.Printf("Reason: %v", reason)
257-
if extractSessionsConfig.printQueryForKnownIssue {
258-
log.Printf("Query:\n%v\n\n", pgQuery.Text)
259-
}
235+
queryIndex := 0
236+
for _, session := range sessions {
237+
for _, transaction := range session.Transactions {
238+
for _, pgQuery := range transaction.Queries {
239+
queryIndex++
240+
if queryIndex%checkPgQueriesConfig.printProgressEveryQueries == 0 {
241+
percent := float64(queryIndex) / float64(totalQueries) * 100
242+
log.Printf("Checking query %8d/%v (%0.2f)", queryIndex, totalQueries, percent)
260243
}
261-
errorLimit--
262-
}
263-
if errorLimit == 0 {
264-
log.Println("Error limit reached:", extractSessionsConfig.errorLimit)
265-
return
244+
queries <- pgQuery.Text
266245
}
267246
}
268247
}
269-
}
270248

271-
if extractSessionsConfig.printStats {
272-
stats.PrintStats()
249+
close(queries)
250+
}()
251+
252+
return queries
253+
}
254+
255+
func checkQueries(rules Rules, stats *QueryStats, db *ydb.Driver, queries <-chan string) {
256+
if checkPgQueriesConfig.checkersCount < 1 {
257+
log.Fatalf("can't start less then 1 checker, got: %v", checkPgQueriesConfig.checkersCount)
273258
}
274-
if extractSessionsConfig.writeStatPath != "" {
275-
if err := stats.SaveToFile(extractSessionsConfig.writeStatPath); err != nil {
276-
log.Printf("Failed to write stat: %+v", err)
277-
}
259+
260+
var wg sync.WaitGroup
261+
for range checkPgQueriesConfig.checkersCount {
262+
wg.Add(1)
263+
go func() {
264+
defer wg.Done()
265+
for q := range queries {
266+
checkQuery(stats, rules, db, q)
267+
}
268+
}()
278269
}
270+
wg.Wait()
279271
}
280272

281273
type checkResultType int
@@ -313,16 +305,16 @@ func checkQuery(stat *QueryStats, rules Rules, db *ydb.Driver, queryText string)
313305

314306
issues := internal.ExtractIssues(err)
315307

316-
if issueReason, ok := rules.FindKnownIssue(queryText, issues); ok {
317-
stat.CountAsKnown(reason, queryText)
318-
return issueReason.Name, checkResultErrKnown
308+
knownIssues, unknownIssues := rules.MatchToKnownIssues(queryText, issues)
309+
for _, knownIssue := range knownIssues {
310+
if knownIssue.Name != "" && !knownIssue.Skip {
311+
stat.CountAsKnown(knownIssue.Name, queryText)
312+
return knownIssue.Name, checkResultErrKnown
313+
}
319314
}
320315

321-
reason = fmt.Sprintf("%v (%v): %#v", ydbErr.Name(), ydbErr.Code(), issues)
322-
if len(issues) == 0 {
323-
324-
}
325-
stat.CountAsUnknown(issues, queryText)
316+
reason = fmt.Sprintf("%v (%v): %#v", ydbErr.Name(), ydbErr.Code(), unknownIssues)
317+
stat.CountAsUnknown(unknownIssues, queryText)
326318
return reason, checkResultErrUnknown
327319
}
328320

@@ -363,6 +355,7 @@ var (
363355
)
364356

365357
type QueryStats struct {
358+
m sync.Mutex
366359
OkCount int
367360
TotalCount int
368361

@@ -371,15 +364,24 @@ type QueryStats struct {
371364
}
372365

373366
func (s *QueryStats) GetOkPercent() float64 {
367+
s.m.Lock()
368+
defer s.m.Unlock()
369+
374370
return float64(s.OkCount) / float64(s.TotalCount) * 100
375371
}
376372

377373
func (s *QueryStats) CountASOK(query string) {
374+
s.m.Lock()
375+
defer s.m.Unlock()
376+
378377
s.TotalCount++
379378
s.OkCount++
380379
}
381380

382381
func (s *QueryStats) CountAsKnown(ruleName string, query string) {
382+
s.m.Lock()
383+
defer s.m.Unlock()
384+
383385
s.TotalCount++
384386
if s.MatchToRules == nil {
385387
s.MatchToRules = make(map[string]*CounterWithExample[string])
@@ -402,6 +404,9 @@ func (s *QueryStats) CountAsKnown(ruleName string, query string) {
402404
}
403405

404406
func (s *QueryStats) CountAsUnknown(issues []internal.YdbIssue, query string) {
407+
s.m.Lock()
408+
defer s.m.Unlock()
409+
405410
s.TotalCount++
406411
if s.UnknownProblems == nil {
407412
s.UnknownProblems = make(map[internal.YdbIssue]*CounterWithExample[internal.YdbIssue])
@@ -425,14 +430,23 @@ func (s *QueryStats) CountAsUnknown(issues []internal.YdbIssue, query string) {
425430
}
426431

427432
func (s *QueryStats) GetTopKnown(count int) []CounterWithExample[string] {
433+
s.m.Lock()
434+
defer s.m.Unlock()
435+
428436
return getTopCounter(s.MatchToRules, count)
429437
}
430438

431439
func (s *QueryStats) GetTopUnknown(count int) []CounterWithExample[internal.YdbIssue] {
440+
s.m.Lock()
441+
defer s.m.Unlock()
442+
432443
return getTopCounter(s.UnknownProblems, count)
433444
}
434445

435446
func (s *QueryStats) PrintStats() {
447+
s.m.Lock()
448+
defer s.m.Unlock()
449+
436450
fmt.Println("Queries stat.")
437451
fmt.Println("Ok Count:", s.OkCount)
438452
fmt.Println()

tools/greenplum-to-pg-tests/cmd/issue_rules.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,22 @@ func (r *Rules) WriteToFile(path string) error {
7474
return nil
7575
}
7676

77-
func (r *Rules) FindKnownIssue(queryText string, ydbIssues []internal.YdbIssue) (_ PgIssueRules, ok bool) {
77+
func (r *Rules) MatchToKnownIssues(queryText string, ydbIssues []internal.YdbIssue) ([]PgIssueRules, []internal.YdbIssue) {
78+
var res []PgIssueRules
79+
var restYdbIssues []internal.YdbIssue
80+
81+
ydbIssue:
7882
for _, ydbIssue := range ydbIssues {
7983
for _, item := range r.Issues {
8084
if item.IsMatched(queryText, ydbIssue) {
81-
if item.Skip {
82-
continue
83-
}
84-
return item, true
85+
res = append(res, item)
86+
continue ydbIssue
8587
}
8688
}
89+
restYdbIssues = append(restYdbIssues, ydbIssue)
8790
}
8891

89-
return PgIssueRules{}, false
92+
return res, restYdbIssues
9093
}
9194

9295
func (r *Rules) UpdateFromStats(stats QueryStats, sortByCount bool) {

0 commit comments

Comments
 (0)