|
21 | 21 | ).get_caller_identity()["Account"]
|
22 | 22 | bucket_name = "sagemaker-us-west-2-%s" % account
|
23 | 23 |
|
| 24 | +MAX_IN_PROGRESS_BUILDS = 3 |
| 25 | +INTERVAL_BETWEEN_CONCURRENT_RUNS = 15 # minutes |
| 26 | +CLEAN_UP_TICKETS_OLDER_THAN = 8 # hours |
| 27 | + |
24 | 28 |
|
25 | 29 | def queue_build():
|
26 |
| - build_id = os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID") |
27 |
| - source_version = os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION").replace( |
28 |
| - "/", "-" |
29 |
| - ) |
30 | 30 | ticket_number = int(1000 * time.time())
|
31 |
| - filename = "%s_%s_%s" % (ticket_number, build_id, source_version) |
32 |
| - |
33 |
| - print("Created queue ticket %s" % ticket_number) |
34 |
| - |
35 |
| - _write_ticket(filename) |
36 | 31 | files = _list_tickets()
|
37 |
| - _cleanup_tickets_older_than_8_hours(files) |
38 |
| - _wait_for_other_builds(files, ticket_number) |
| 32 | + _cleanup_tickets_older_than(files) |
| 33 | + _wait_for_other_builds(ticket_number) |
39 | 34 |
|
40 | 35 |
|
41 | 36 | def _build_info_from_file(file):
|
42 |
| - filename = file.key.split("/")[1] |
| 37 | + filename = file.key.split("/")[2] |
43 | 38 | ticket_number, build_id, source_version = filename.split("_")
|
44 | 39 | return int(ticket_number), build_id, source_version
|
45 | 40 |
|
46 | 41 |
|
47 |
| -def _wait_for_other_builds(files, ticket_number): |
48 |
| - newfiles = list(filter(lambda file: not _file_older_than(file), files)) |
49 |
| - sorted_files = list(sorted(newfiles, key=lambda y: y.key)) |
| 42 | +def _wait_for_other_builds(ticket_number): |
| 43 | + sorted_files = _list_tickets() |
50 | 44 |
|
51 | 45 | print("build queue status:")
|
52 | 46 | print()
|
53 | 47 |
|
54 | 48 | for order, file in enumerate(sorted_files):
|
55 | 49 | file_ticket_number, build_id, source_version = _build_info_from_file(file)
|
56 | 50 | print(
|
57 |
| - "%s -> %s %s, ticket number: %s" % (order, build_id, source_version, file_ticket_number) |
| 51 | + "%s -> %s %s, ticket number: %s status: %s" |
| 52 | + % (order, build_id, source_version, file_ticket_number, file.key.split("/")[1]) |
58 | 53 | )
|
| 54 | + print() |
| 55 | + build_id = os.environ.get("CODEBUILD_BUILD_ID", "CODEBUILD-BUILD-ID") |
| 56 | + source_version = os.environ.get("CODEBUILD_SOURCE_VERSION", "CODEBUILD-SOURCE-VERSION").replace( |
| 57 | + "/", "-" |
| 58 | + ) |
| 59 | + filename = "%s_%s_%s" % (ticket_number, build_id, source_version) |
| 60 | + s3_file_obj = _write_ticket(filename, status="waiting") |
| 61 | + print("Build %s waiting to be scheduled" % filename) |
| 62 | + |
| 63 | + while True: |
| 64 | + _cleanup_tickets_with_terminal_states() |
| 65 | + waiting_tickets = _list_tickets("waiting") |
| 66 | + if waiting_tickets: |
| 67 | + first_waiting_ticket_number, _, _ = _build_info_from_file(_list_tickets("waiting")[0]) |
| 68 | + else: |
| 69 | + first_waiting_ticket_number = ticket_number |
| 70 | + |
| 71 | + if ( |
| 72 | + len(_list_tickets(status="in-progress")) < 3 |
| 73 | + and last_in_progress_elapsed_time_check() |
| 74 | + and first_waiting_ticket_number == ticket_number |
| 75 | + ): |
| 76 | + # put the build in progress |
| 77 | + print("Scheduling build %s for running.." % filename) |
| 78 | + s3_file_obj.delete() |
| 79 | + _write_ticket(filename, status="in-progress") |
| 80 | + break |
| 81 | + else: |
| 82 | + # wait |
| 83 | + time.sleep(30) |
59 | 84 |
|
60 |
| - for file in sorted_files: |
61 |
| - file_ticket_number, build_id, source_version = _build_info_from_file(file) |
62 | 85 |
|
63 |
| - if file_ticket_number == ticket_number: |
| 86 | +def last_in_progress_elapsed_time_check(): |
| 87 | + in_progress_tickets = _list_tickets("in-progress") |
| 88 | + if not in_progress_tickets: |
| 89 | + return True |
| 90 | + last_in_progress_ticket, _, _ = _build_info_from_file(_list_tickets("in-progress")[-1]) |
| 91 | + _elapsed_time = int(1000 * time.time()) - last_in_progress_ticket |
| 92 | + last_in_progress_elapsed_time = int(_elapsed_time / (1000 * 60)) # in minutes |
| 93 | + return last_in_progress_elapsed_time > INTERVAL_BETWEEN_CONCURRENT_RUNS |
64 | 94 |
|
65 |
| - break |
66 |
| - else: |
67 |
| - while True: |
68 |
| - client = boto3.client("codebuild") |
69 |
| - response = client.batch_get_builds(ids=[build_id]) |
70 |
| - build_status = response["builds"][0]["buildStatus"] |
71 |
| - |
72 |
| - if build_status == "IN_PROGRESS": |
73 |
| - print( |
74 |
| - "waiting on build %s %s %s" % (build_id, source_version, file_ticket_number) |
75 |
| - ) |
76 |
| - time.sleep(30) |
77 |
| - else: |
78 |
| - print("build %s finished, deleting lock" % build_id) |
79 |
| - file.delete() |
80 |
| - break |
81 |
| - |
82 |
| - |
83 |
| -def _cleanup_tickets_older_than_8_hours(files): |
| 95 | + |
| 96 | +def _cleanup_tickets_with_terminal_states(): |
| 97 | + files = _list_tickets() |
| 98 | + for file in files: |
| 99 | + _, build_id, _ = _build_info_from_file(file) |
| 100 | + client = boto3.client("codebuild") |
| 101 | + response = client.batch_get_builds(ids=[build_id]) |
| 102 | + build_status = response["builds"][0]["buildStatus"] |
| 103 | + |
| 104 | + if build_status != "IN_PROGRESS": |
| 105 | + print("Build %s in terminal state: %s, deleting lock" % (build_id, build_status)) |
| 106 | + file.delete() |
| 107 | + |
| 108 | + |
| 109 | +def _cleanup_tickets_older_than(files): |
84 | 110 | oldfiles = list(filter(_file_older_than, files))
|
85 | 111 | for file in oldfiles:
|
86 | 112 | print("object %s older than 8 hours. Deleting" % file.key)
|
87 | 113 | file.delete()
|
88 | 114 | return files
|
89 | 115 |
|
90 | 116 |
|
91 |
| -def _list_tickets(): |
| 117 | +def _list_tickets(status=None): |
92 | 118 | s3 = boto3.resource("s3")
|
93 | 119 | bucket = s3.Bucket(bucket_name)
|
94 |
| - objects = [file for file in bucket.objects.filter(Prefix="ci-lock/")] |
95 |
| - files = list(filter(lambda x: x != "ci-lock/", objects)) |
96 |
| - return files |
| 120 | + prefix = "ci-integ-queue/{}/".format(status) if status else "ci-integ-queue/" |
| 121 | + objects = [file for file in bucket.objects.filter(Prefix=prefix)] |
| 122 | + files = list(filter(lambda x: x != prefix, objects)) |
| 123 | + sorted_files = list(sorted(files, key=lambda y: y.key)) |
| 124 | + return sorted_files |
97 | 125 |
|
98 | 126 |
|
99 | 127 | def _file_older_than(file):
|
100 |
| - timelimit = 1000 * 60 * 60 * 8 |
101 |
| - |
| 128 | + timelimit = 1000 * 60 * 60 * CLEAN_UP_TICKETS_OLDER_THAN |
102 | 129 | file_ticket_number, build_id, source_version = _build_info_from_file(file)
|
| 130 | + return int(1000 * time.time()) - file_ticket_number > timelimit |
103 | 131 |
|
104 |
| - return int(time.time()) - file_ticket_number > timelimit |
105 |
| - |
106 |
| - |
107 |
| -def _write_ticket(ticket_number): |
108 | 132 |
|
109 |
| - if not os.path.exists("ci-lock"): |
110 |
| - os.mkdir("ci-lock") |
| 133 | +def _write_ticket(filename, status="waiting"): |
| 134 | + file_path = "ci-integ-queue/{}".format(status) |
| 135 | + if not os.path.exists(file_path): |
| 136 | + os.makedirs(file_path) |
111 | 137 |
|
112 |
| - filename = "ci-lock/" + ticket_number |
113 |
| - with open(filename, "w") as file: |
114 |
| - file.write(ticket_number) |
115 |
| - boto3.Session().resource("s3").Object(bucket_name, filename).upload_file(filename) |
| 138 | + file_full_path = file_path + "/" + filename |
| 139 | + with open(file_full_path, "w") as file: |
| 140 | + file.write(filename) |
| 141 | + s3_file_obj = boto3.Session().resource("s3").Object(bucket_name, file_full_path) |
| 142 | + s3_file_obj.upload_file(file_full_path) |
| 143 | + print("Build %s is now in state %s" % (filename, status)) |
| 144 | + return s3_file_obj |
116 | 145 |
|
117 | 146 |
|
118 | 147 | if __name__ == "__main__":
|
|
0 commit comments