Skip to content

WIP: Add incremental test scripts to misc folder #2117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions misc/analyze_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python

from typing import Any, Dict, Generator, Iterable, List, Optional
from collections import Counter

import os
import os.path
import json

ROOT = ".mypy_cache/3.5"

JsonDict = Dict[str, Any]

class CacheData:
def __init__(self, filename: str, data_json: JsonDict, meta_json: JsonDict,
data_size: int, meta_size: int) -> None:
self.filename = filename
self.data = data_json
self.meta = meta_json
self.data_size = data_size
self.meta_size = meta_size

@property
def total_size(self):
return self.data_size + self.meta_size


def extract_classes(chunks: Iterable[CacheData]) -> Iterable[JsonDict]:
def extract(chunks: Iterable[JsonDict]) -> Iterable[JsonDict]:
for chunk in chunks:
if isinstance(chunk, dict):
yield chunk
yield from extract(chunk.values())
elif isinstance(chunk, list):
yield from extract(chunk)
yield from extract([chunk.data for chunk in chunks])


def load_json(data_path: str, meta_path: str) -> CacheData:
with open(data_path, 'r') as ds:
data_json = json.load(ds)

with open(meta_path, 'r') as ms:
meta_json = json.load(ms)

data_size = os.path.getsize(data_path)
meta_size = os.path.getsize(meta_path)

return CacheData(data_path.replace(".data.json", ".*.json"),
data_json, meta_json, data_size, meta_size)


def get_files(root: str) -> Iterable[CacheData]:
for (dirpath, dirnames, filenames) in os.walk(root):
for filename in filenames:
if filename.endswith(".data.json"):
meta_filename = filename.replace(".data.json", ".meta.json")
yield load_json(
os.path.join(dirpath, filename),
os.path.join(dirpath, meta_filename))


def pluck(name: str, chunks: Iterable[JsonDict]) -> Iterable[JsonDict]:
return (chunk for chunk in chunks if chunk['.class'] == name)


def report_counter(counter: Counter, amount: Optional[int] = None) -> None:
for name, count in counter.most_common(amount):
print(' {: <8} {}'.format(count, name))
print()


def report_most_common(chunks: List[JsonDict], amount: Optional[int] = None) -> None:
report_counter(Counter(str(chunk) for chunk in chunks), amount)


def compress(chunk: JsonDict) -> JsonDict:
cache = {} # type: Dict[int, JsonDict]
counter = 0
def helper(chunk: Any) -> Any:
nonlocal counter
if not isinstance(chunk, dict):
return chunk

if len(chunk) <= 2:
return chunk
id = hash(str(chunk))

if id in cache:
return cache[id]
else:
cache[id] = {'.id': counter}
chunk['.cache_id'] = counter
counter += 1

for name in sorted(chunk.keys()):
value = chunk[name]
if isinstance(value, list):
chunk[name] = [helper(child) for child in value]
elif isinstance(value, dict):
chunk[name] = helper(value)

return chunk
out = helper(chunk)
return out

def decompress(chunk: JsonDict) -> JsonDict:
cache = {} # type: Dict[int, JsonDict]
def helper(chunk: Any) -> Any:
if not isinstance(chunk, dict):
return chunk
if '.id' in chunk:
return cache[chunk['.id']]

counter = None
if '.cache_id' in chunk:
counter = chunk['.cache_id']
del chunk['.cache_id']

for name in sorted(chunk.keys()):
value = chunk[name]
if isinstance(value, list):
chunk[name] = [helper(child) for child in value]
elif isinstance(value, dict):
chunk[name] = helper(value)

if counter is not None:
cache[counter] = chunk

return chunk
return helper(chunk)




def main() -> None:
json_chunks = list(get_files(ROOT))
class_chunks = list(extract_classes(json_chunks))

total_size = sum(chunk.total_size for chunk in json_chunks)
print("Total cache size: {:.3f} megabytes".format(total_size / (1024 * 1024)))
print()

class_name_counter = Counter(chunk[".class"] for chunk in class_chunks)
print("Most commonly used classes:")
report_counter(class_name_counter)

print("Most common literal chunks:")
report_most_common(class_chunks, 15)

build = None
for chunk in json_chunks:
if 'build.*.json' in chunk.filename:
build = chunk
break
original = json.dumps(build.data, sort_keys=True)
print("Size of build.data.json, in kilobytes: {:.3f}".format(len(original) / 1024))

build.data = compress(build.data)
compressed = json.dumps(build.data, sort_keys=True)
print("Size of compressed build.data.json, in kilobytes: {:.3f}".format(len(compressed) / 1024))

build.data = decompress(build.data)
decompressed = json.dumps(build.data, sort_keys=True)
print("Size of decompressed build.data.json, in kilobytes: {:.3f}".format(len(decompressed) / 1024))

print("Lossless conversion back", original == decompressed)


'''var_chunks = list(pluck("Var", class_chunks))
report_most_common(var_chunks, 20)
print()

#for var in var_chunks:
# if var['fullname'] == 'self' and not (isinstance(var['type'], dict) and var['type']['.class'] == 'AnyType'):
# print(var)
#argument_chunks = list(pluck("Argument", class_chunks))

symbol_table_node_chunks = list(pluck("SymbolTableNode", class_chunks))
report_most_common(symbol_table_node_chunks, 20)

print()
print("Most common")
report_most_common(class_chunks, 20)
print()'''


if __name__ == '__main__':
main()
93 changes: 93 additions & 0 deletions misc/perf_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3

from typing import Callable, List, Tuple

import os
import shutil
import statistics
import subprocess
import textwrap
import time


class Command:
def __init__(self, setup: Callable[[], None], command: Callable[[], None]) -> None:
self.setup = setup
self.command = command


def print_offset(text: str, indent_length: int = 4) -> None:
print()
print(textwrap.indent(text, ' ' * indent_length))
print()


def delete_folder(folder_path: str) -> None:
if os.path.exists(folder_path):
shutil.rmtree(folder_path)


def execute(command: List[str]) -> None:
proc = subprocess.Popen(
' '.join(command),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True)
stdout_bytes, stderr_bytes = proc.communicate() # type: Tuple[bytes, bytes]
stdout, stderr = stdout_bytes.decode('utf-8'), stderr_bytes.decode('utf-8')
if proc.returncode != 0:
print('EXECUTED COMMAND:', repr(command))
print('RETURN CODE:', proc.returncode)
print()
print('STDOUT:')
print_offset(stdout)
print('STDERR:')
print_offset(stderr)
raise RuntimeError('Unexpected error from external tool.')


def trial(num_trials: int, command: Command) -> List[float]:
trials = []
for i in range(num_trials):
command.setup()
start = time.time()
command.command()
delta = time.time() - start
trials.append(delta)
return trials


def report(name: str, times: List[float]) -> None:
print("{}:".format(name))
print(" Times: {}".format(times))
print(" Mean: {}".format(statistics.mean(times)))
print(" Stdev: {}".format(statistics.stdev(times)))
print()


def main() -> None:
trials = 3

print("Testing baseline")
baseline = trial(trials, Command(
lambda: None,
lambda: execute(["python3", "-m", "mypy", "mypy"])))
report("Baseline", baseline)

print("Testing cold cache")
cold_cache = trial(trials, Command(
lambda: delete_folder(".mypy_cache"),
lambda: execute(["python3", "-m", "mypy", "-i", "mypy"])))
report("Cold cache", cold_cache)

print("Testing warm cache")
execute(["python3", "-m", "mypy", "-i", "mypy"])
warm_cache = trial(trials, Command(
lambda: None,
lambda: execute(["python3", "-m", "mypy", "-i", "mypy"])))
report("Warm cache", warm_cache)


if __name__ == '__main__':
main()

71 changes: 71 additions & 0 deletions misc/test_case_to_actual.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Iterator, List
import sys
import os
import os.path


class Chunk:
def __init__(self, header_type: str, args: str) -> None:
self.header_type = header_type
self.args = args
self.lines = [] # type: List[str]


def is_header(line: str) -> bool:
return line.startswith('[') and line.endswith(']')


def normalize(lines: Iterator[str]) -> Iterator[str]:
return (line.rstrip() for line in lines)


def produce_chunks(lines: Iterator[str]) -> Iterator[Chunk]:
current_chunk = None # type: Chunk
for line in normalize(lines):
if is_header(line):
if current_chunk is not None:
yield current_chunk
parts = line[1:-1].split(' ', 1)
args = parts[1] if len(parts) > 1 else ''
current_chunk = Chunk(parts[0], args)
else:
current_chunk.lines.append(line)
if current_chunk is not None:
yield current_chunk


def write_out(filename: str, lines: List[str]) -> None:
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'w') as stream:
stream.write('\n'.join(lines))


def write_tree(root: str, chunks: Iterator[Chunk]) -> None:
init = next(chunks)
assert init.header_type == 'case'

root = os.path.join(root, init.args)
write_out(os.path.join(root, 'main.py'), init.lines)

for chunk in chunks:
if chunk.header_type == 'file' and chunk.args.endswith('.py'):
write_out(os.path.join(root, chunk.args), chunk.lines)


def help() -> None:
print("Usage: python misc/test_case_to_actual.py test_file.txt root_path")


def main() -> None:
if len(sys.argv) != 3:
help()
return

test_file_path, root_path = sys.argv[1], sys.argv[2]
with open(test_file_path, 'r') as stream:
chunks = produce_chunks(iter(stream))
write_tree(root_path, chunks)


if __name__ == '__main__':
main()
Loading