Skip to content

Commit 321c2e2

Browse files
committed
read sorted items
1 parent e6d2009 commit 321c2e2

File tree

3 files changed

+186
-40
lines changed

3 files changed

+186
-40
lines changed

tools/greenplum-to-pg-tests/cmd/check_pg_queries.go

Lines changed: 158 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Copyright © 2024 NAME HERE <EMAIL ADDRESS>
44
package cmd
55

66
import (
7+
"compress/gzip"
78
"context"
89
"encoding/json"
910
"errors"
@@ -16,6 +17,7 @@ import (
1617
"slices"
1718
"strings"
1819
"sync"
20+
"sync/atomic"
1921
"time"
2022

2123
"github.com/spf13/cobra"
@@ -29,6 +31,7 @@ import (
2931
var checkPgQueriesConfig struct {
3032
schemeDumpFile string
3133
sessionsLog string
34+
sessionsLogNeedSort bool
3235
includeFailed bool
3336
ydbConnectionString string
3437
limitRequests int
@@ -41,6 +44,7 @@ var checkPgQueriesConfig struct {
4144
printStats bool
4245
printProgressEveryQueries int
4346
writeStatPath string
47+
writeStatEveryItems int
4448
checkersCount int
4549
}
4650

@@ -49,6 +53,7 @@ func init() {
4953

5054
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.schemeDumpFile, "schemedump-file", "", "Path to dump of db schema. Set empty for skip read schema.")
5155
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.sessionsLog, "query-log", "", "Set path to input sessions log")
56+
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.sessionsLogNeedSort, "query-log-need-sort", false, "Sort query log in memory before start")
5257
must0(checkPgQueriesCmd.MarkPersistentFlagRequired("query-log"))
5358

5459
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.includeFailed, "include-failed", false, "Extract sessions with failed transactions")
@@ -61,8 +66,10 @@ func init() {
6166
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printQueryForKnownIssue, "print-query-for-known-issues", true, "Print query for known issues")
6267
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printErrorsInProgress, "print-progress", false, "Print queries in progress")
6368
checkPgQueriesCmd.PersistentFlags().BoolVar(&checkPgQueriesConfig.printStats, "print-stats", true, "Print queries in progress")
64-
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.printProgressEveryQueries, "print-progress-every-queries", 10, "Periodically print progress")
69+
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.printProgressEveryQueries, "print-progress-every-queries", 100, "Periodically print progress")
6570
checkPgQueriesCmd.PersistentFlags().StringVar(&checkPgQueriesConfig.writeStatPath, "write-stat-file", "", "Path to write full stat file if need. Will write example of queries")
71+
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.writeStatEveryItems, "write-stat-every-items", 10000, "Interval for write current stat")
72+
6673
checkPgQueriesCmd.PersistentFlags().IntVar(&checkPgQueriesConfig.checkersCount, "check-queries-parallel", 5, "How many queries may be checked in parallel")
6774
}
6875

@@ -105,8 +112,13 @@ var checkPgQueriesCmd = &cobra.Command{
105112
))
106113
cancel()
107114

108-
sessions := readSessions()
109-
queries := extractQueries(sessions)
115+
var queries <-chan string
116+
fileReader := openFileReader()
117+
if checkPgQueriesConfig.sessionsLogNeedSort {
118+
queries = generateQueriesFromUnsortedSessions(fileReader)
119+
} else {
120+
queries = readSortedQueries(fileReader)
121+
}
110122

111123
log.Println("Start check queries")
112124
var stats QueryStats
@@ -127,8 +139,105 @@ var checkPgQueriesCmd = &cobra.Command{
127139
},
128140
}
129141

130-
func readSessions() []internal.Session {
131-
reader := must(os.Open(checkPgQueriesConfig.sessionsLog))
142+
func openFileReader() io.ReadCloser {
143+
filepath := checkPgQueriesConfig.sessionsLog
144+
fileReader, err := os.Open(filepath)
145+
if err != nil {
146+
log.Fatalf("Failed to open file %q: %v", filepath, err)
147+
}
148+
149+
if strings.HasSuffix(strings.ToLower(filepath), ".gz") {
150+
gzipReader, err := gzip.NewReader(fileReader)
151+
if err != nil {
152+
log.Fatalf("Failed to start gzip reader for %q: %v", filepath, err)
153+
}
154+
return gzipReaderClose{
155+
gzipReader: gzipReader,
156+
fileReader: fileReader,
157+
}
158+
}
159+
160+
return fileReader
161+
}
162+
163+
type gzipReaderClose struct {
164+
gzipReader *gzip.Reader
165+
fileReader *os.File
166+
}
167+
168+
func (g gzipReaderClose) Read(p []byte) (n int, err error) {
169+
return g.gzipReader.Read(p)
170+
}
171+
172+
func (g gzipReaderClose) Close() error {
173+
gzipCloseErr := g.gzipReader.Close()
174+
fileCloseErr := g.fileReader.Close()
175+
176+
if gzipCloseErr != nil {
177+
return gzipCloseErr
178+
}
179+
180+
return fileCloseErr
181+
}
182+
183+
func readSortedQueries(reader io.ReadCloser) <-chan string {
184+
queries := make(chan string)
185+
go func() {
186+
defer reader.Close()
187+
defer close(queries)
188+
189+
decoder := json.NewDecoder(reader)
190+
limitCount := checkPgQueriesConfig.limitRequests
191+
counter := 0
192+
193+
needDeleteLine := false
194+
for {
195+
if limitCount > 0 && counter >= limitCount {
196+
log.Println("Count limit reached")
197+
return
198+
}
199+
200+
var item internal.SessionLogRecord
201+
if err := decoder.Decode(&item); err != nil {
202+
switch {
203+
case errors.Is(err, io.EOF):
204+
log.Printf("Read file completed, read items: %v", counter)
205+
return
206+
case err != nil:
207+
log.Printf("Failed to decode item %v: %v", counter, err)
208+
return
209+
default:
210+
// pass
211+
}
212+
}
213+
214+
queries <- item.Query
215+
counter++
216+
if counter%checkPgQueriesConfig.printProgressEveryQueries == 0 {
217+
if needDeleteLine {
218+
printDeleteLine()
219+
} else {
220+
needDeleteLine = true
221+
}
222+
223+
var percent float64
224+
if limitCount > 0 {
225+
percent = float64(counter) / float64(limitCount) * 100
226+
}
227+
log.Printf("Read items %v/%v (%0.2f)", counter, limitCount, percent)
228+
}
229+
}
230+
}()
231+
232+
return queries
233+
}
234+
235+
func generateQueriesFromUnsortedSessions(reader io.ReadCloser) <-chan string {
236+
sessions := readSessions(reader)
237+
return extractQueries(sessions)
238+
}
239+
240+
func readSessions(reader io.ReadCloser) []internal.Session {
132241
defer reader.Close()
133242

134243
decoder := json.NewDecoder(reader)
@@ -241,7 +350,7 @@ func extractQueries(sessions []internal.Session) <-chan string {
241350
if queryIndex%checkPgQueriesConfig.printProgressEveryQueries == 0 {
242351
percent := float64(queryIndex) / float64(totalQueries) * 100
243352
if needRemoveLine {
244-
fmt.Printf("\033[1A\033[K")
353+
printDeleteLine()
245354
} else {
246355
needRemoveLine = true
247356
}
@@ -258,18 +367,30 @@ func extractQueries(sessions []internal.Session) <-chan string {
258367
return queries
259368
}
260369

370+
func printDeleteLine() {
371+
fmt.Printf("\033[1A\033[K")
372+
}
373+
261374
func checkQueries(rules Rules, stats *QueryStats, db *ydb.Driver, queries <-chan string) {
262375
if checkPgQueriesConfig.checkersCount < 1 {
263376
log.Fatalf("can't start less then 1 checker, got: %v", checkPgQueriesConfig.checkersCount)
264377
}
265378

379+
var itemsCounter atomic.Int64
380+
writeStatEveryItems := int64(checkPgQueriesConfig.writeStatEveryItems)
266381
var wg sync.WaitGroup
267382
for range checkPgQueriesConfig.checkersCount {
268383
wg.Add(1)
269384
go func() {
270385
defer wg.Done()
271386
for q := range queries {
272387
checkQuery(stats, rules, db, q)
388+
counter := itemsCounter.Add(1)
389+
if counter%writeStatEveryItems == 0 && checkPgQueriesConfig.writeStatPath != "" {
390+
if err := stats.SaveToFile(checkPgQueriesConfig.writeStatPath); err != nil {
391+
log.Printf("Stat file written failed %q: %v", checkPgQueriesConfig.writeStatPath, err)
392+
}
393+
}
273394
}
274395
}()
275396
}
@@ -356,12 +477,14 @@ func cutGreenplumSpecific(q string) string {
356477

357478
var (
358479
createAndDistributedByWithBrackets = regexp.MustCompile(`(?is)CREATE\s+.*\sTABLE\s+.*\s+AS\s+\(\s*(.*)\s*\)\s+DISTRIBUTED\s+BY\s\(.*\)`)
359-
createTableAsSelect = regexp.MustCompile(`(?i)create\s+(temporary\s+)?table .* as`)
480+
createTableAsSelect = regexp.MustCompile(`(?is)create\s+(temporary\s+)?table .* as`)
360481
distributedBy = regexp.MustCompile(`(?i)DISTRIBUTED BY \(.*\)`)
361482
)
362483

363484
type QueryStats struct {
364-
m sync.Mutex
485+
m sync.RWMutex
486+
writeStatMutex sync.Mutex
487+
365488
OkCount int
366489
TotalCount int
367490

@@ -370,9 +493,13 @@ type QueryStats struct {
370493
}
371494

372495
func (s *QueryStats) GetOkPercent() float64 {
373-
s.m.Lock()
374-
defer s.m.Unlock()
496+
s.m.RLock()
497+
defer s.m.RUnlock()
498+
499+
return s.getOkPercentNeedLock()
500+
}
375501

502+
func (s *QueryStats) getOkPercentNeedLock() float64 {
376503
return float64(s.OkCount) / float64(s.TotalCount) * 100
377504
}
378505

@@ -434,16 +561,24 @@ func (s *QueryStats) CountAsUnknown(reason string, query string) {
434561
}
435562

436563
func (s *QueryStats) GetTopKnown(count int) []CounterWithExample[string] {
437-
s.m.Lock()
438-
defer s.m.Unlock()
564+
s.m.RLock()
565+
defer s.m.RUnlock()
439566

567+
return s.getTopKnownNeedLock(count)
568+
}
569+
570+
func (s *QueryStats) getTopKnownNeedLock(count int) []CounterWithExample[string] {
440571
return getTopCounter(s.MatchToRules, count)
441572
}
442573

443574
func (s *QueryStats) GetTopUnknown(count int) []CounterWithExample[string] {
444-
s.m.Lock()
445-
defer s.m.Unlock()
575+
s.m.RLock()
576+
defer s.m.RUnlock()
446577

578+
return s.getTopUnknownNeedLock(count)
579+
}
580+
581+
func (s *QueryStats) getTopUnknownNeedLock(count int) []CounterWithExample[string] {
447582
return getTopCounter(s.UnknownProblems, count)
448583
}
449584

@@ -497,6 +632,12 @@ func getTopCounter[K comparable](m map[K]*CounterWithExample[K], count int) []Co
497632
}
498633

499634
func (s *QueryStats) SaveToFile(path string) error {
635+
s.writeStatMutex.Lock()
636+
defer s.writeStatMutex.Unlock()
637+
638+
s.m.RLock()
639+
defer s.m.RUnlock()
640+
500641
var statFile struct {
501642
TotalCount int `yaml:"total_count"`
502643
OkCount int `yaml:"ok_count"`
@@ -507,9 +648,9 @@ func (s *QueryStats) SaveToFile(path string) error {
507648

508649
statFile.TotalCount = s.TotalCount
509650
statFile.OkCount = s.OkCount
510-
statFile.OkPercent = s.GetOkPercent()
511-
statFile.UnknownIssues = s.GetTopUnknown(math.MaxInt)
512-
statFile.KnownIssues = s.GetTopKnown(math.MaxInt)
651+
statFile.OkPercent = s.getOkPercentNeedLock()
652+
statFile.UnknownIssues = s.getTopUnknownNeedLock(math.MaxInt)
653+
statFile.KnownIssues = s.getTopKnownNeedLock(math.MaxInt)
513654

514655
for i := range statFile.UnknownIssues {
515656
statFile.UnknownIssues[i].Example = cleanStringForLiteralYaml(statFile.UnknownIssues[i].Example)

tools/greenplum-to-pg-tests/cmd/check_pg_queries_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ func TestFixSchemaName(t *testing.T) {
1414
}{
1515
{
1616
from: `DELETE
17-
FROM "eda_dds_partner"."a__item__item_name__h2" t
18-
USING "for_delete_extend" AS base
17+
FROM "asd"."sss" t
18+
USING "kkk" AS base
1919
WHERE`,
2020
result: `DELETE
21-
FROM eda_dds_partner___a__item__item_name__h2 t
22-
USING "for_delete_extend" AS base
21+
FROM asd___sss t
22+
USING "kkk" AS base
2323
WHERE`,
2424
},
2525
}
@@ -40,28 +40,28 @@ func TestCutGreenplumSpecific(t *testing.T) {
4040
}{
4141
{
4242
name: "CreateAs",
43-
from: `create table snb_eagle___hex_lavka_orders as
43+
from: `create table aaa as
4444
select *
45-
from snb_geo_lavka___hex_lavka_orders;
45+
from bbb;
4646
`,
4747
to: `
4848
select *
49-
from snb_geo_lavka___hex_lavka_orders;
49+
from bbb;
5050
`,
5151
},
5252
{
5353
name: "CreateAndDistributedBy",
54-
from: ` CREATE TEMPORARY TABLE updated_tickets
54+
from: ` CREATE TEMPORARY TABLE t
5555
AS (
56-
SELECT DISTINCT chatterbox_ticket_id AS ticket_id
57-
FROM taxi_cdm_contactcenter___fct_chatterbox_ticket_event --fct_chatterbox_ticket_event
58-
WHERE utc_event_dttm BETWEEN ''2024-04-10 00:00:00''::timestamp AND ''2024-04-30 23:59:59''::timestamp
56+
SELECT DISTINCT b AS ticket_id
57+
FROM t2 --comment
58+
WHERE mytime BETWEEN ''2024-04-10 00:00:00''::timestamp AND ''2024-04-30 23:59:59''::timestamp
5959
)
6060
DISTRIBUTED BY (ticket_id);
6161
`,
62-
to: ` SELECT DISTINCT chatterbox_ticket_id AS ticket_id
63-
FROM taxi_cdm_contactcenter___fct_chatterbox_ticket_event --fct_chatterbox_ticket_event
64-
WHERE utc_event_dttm BETWEEN ''2024-04-10 00:00:00''::timestamp AND ''2024-04-30 23:59:59''::timestamp
62+
to: ` SELECT DISTINCT b AS ticket_id
63+
FROM t2 --comment
64+
WHERE mytime BETWEEN ''2024-04-10 00:00:00''::timestamp AND ''2024-04-30 23:59:59''::timestamp
6565
;
6666
`,
6767
},

0 commit comments

Comments
 (0)