import SimpleHTTPServer import SocketServer import argparse import logging from math import sqrt from operator import mul import os import json from subprocess import call, check_output import sys import threading import time # Supress logs from feedback server logging.getLogger().setLevel(logging.FATAL) class DefaultLaunchTimeHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): def get_test_page(self): return ''' Launch Time Benchmark

New Tab Benchmark

''' def on_receive_stop_signal(self, data): pass def do_HEAD(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() def do_GET(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() if not self.path.startswith('/blank'): self.wfile.write(self.get_test_page()) self.wfile.close() def do_POST(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write('done') self.wfile.close() data_string = self.rfile.read(int(self.headers['Content-Length'])) self.on_receive_stop_signal(data_string) def log_message(self, format, *args): pass class LaunchTimeBenchmark: def __init__(self): self._server_ready = threading.Semaphore(0) self._server = None self._server_thread = None self._port = 8080 self._feedback_port = None self._feedback_server = None self._open_count = 0 self._app_name = None self._verbose = False self._feedback_in_browser = False self._save_results_to_json = False self._json_results_path = None self._do_not_ignore_first_result = False self._iterations = 5 self._browser_bundle_path = '/Applications/Safari.app' self.response_handler = None self.benchmark_description = None self.use_geometric_mean = False self.wait_time_high = 1 self.wait_time_low = 0.1 self.iteration_groups = 1 self.initialize() def _parse_browser_bundle_path(self, path): if not os.path.isdir(path) or not path.endswith('.app'): raise argparse.ArgumentTypeError( 'Invalid app bundle path: "{}"'.format(path)) return path def _parse_args(self): self.argument_parser = argparse.ArgumentParser(description=self.benchmark_description) self.argument_parser.add_argument('-p', '--path', type=self._parse_browser_bundle_path, help='path for browser application bundle (default: {})'.format(self._browser_bundle_path)) self.argument_parser.add_argument('-n', '--iterations', type=int, help='number of iterations of test (default: {})'.format(self._iterations)) self.argument_parser.add_argument('-v', '--verbose', action='store_true', help="print each iteration's time") self.argument_parser.add_argument('-f', '--feedback-in-browser', action='store_true', help="show benchmark results in browser (default: {})".format(self._feedback_in_browser)) self.argument_parser.add_argument('-o', '--output', type=self._json_results_path, help='saves benchmark results in json format (default: {})'.format(self._json_results_path)) self.will_parse_arguments() args = self.argument_parser.parse_args() if args.iterations: self._iterations = args.iterations if args.path: self._browser_bundle_path = args.path if args.verbose is not None: self._verbose = args.verbose if args.feedback_in_browser is not None: self._feedback_in_browser = args.feedback_in_browser if args.output: self._save_results_to_json = True self._json_results_path = args.output path_len = len(self._browser_bundle_path) start_index = self._browser_bundle_path.rfind('/', 0, path_len) end_index = self._browser_bundle_path.rfind('.', 0, path_len) self._app_name = self._browser_bundle_path[start_index + 1:end_index] self.did_parse_arguments(args) def _run_server(self): self._server_ready.release() self._server.serve_forever() def _setup_servers(self): while True: try: self._server = SocketServer.TCPServer( ('0.0.0.0', self._port), self.response_handler) break except: self._port += 1 print 'Running test server at http://localhost:{}'.format(self._port) self._server_thread = threading.Thread(target=self._run_server) self._server_thread.start() self._server_ready.acquire() if self._feedback_in_browser: from feedback_server import FeedbackServer self._feedback_server = FeedbackServer() self._feedback_port = self._feedback_server.start() def _clean_up(self): self._server.shutdown() self._server_thread.join() if self._feedback_in_browser: self._feedback_server.stop() def _exit_due_to_exception(self, reason): self.log(reason) self._clean_up() sys.exit(1) def _geometric_mean(self, values): product = reduce(mul, values) return product ** (1.0 / len(values)) def _standard_deviation(self, results, mean=None): if mean is None: mean = sum(results) / float(len(results)) divisor = float(len(results) - 1) if len(results) > 1 else float(len(results)) variance = sum((x - mean) ** 2 for x in results) / divisor return sqrt(variance) def _compute_results(self, results): if not results: self._exit_due_to_exception('No results to compute.\n') if len(results) > 1 and not self._do_not_ignore_first_result: results = results[1:] mean = sum(results) / float(len(results)) stdev = self._standard_deviation(results, mean) return mean, stdev def _wait_times(self): if self.iteration_groups == 1: yield self.wait_time_high return increment_per_group = float(self.wait_time_high - self.wait_time_low) / (self.iteration_groups - 1) for i in range(self.iteration_groups): yield self.wait_time_low + increment_per_group * i def open_tab(self, blank=False): if blank: call(['open', '-a', self._browser_bundle_path, 'http://localhost:{}/blank/{}'.format(self._port, self._open_count)]) else: call(['open', '-a', self._browser_bundle_path, 'http://localhost:{}/{}'.format(self._port, self._open_count)]) self._open_count += 1 def launch_browser(self): if self._feedback_in_browser: call(['open', '-a', self._browser_bundle_path, 'http://localhost:{}'.format(self._feedback_port), '-F']) self._feedback_server.wait_until_client_has_loaded() else: call(['open', '-a', self._browser_bundle_path, 'http://localhost:{}/blank'.format(self._port), '-F']) self.wait(2) def quit_browser(self): def quit_app(): call(['osascript', '-e', 'quit app "{}"'.format(self._browser_bundle_path)]) def is_app_closed(): out = check_output(['osascript', '-e', 'tell application "System Events"', '-e', 'copy (get name of every process whose name is "{}") to stdout'.format(self._app_name), '-e', 'end tell']) return len(out.strip()) == 0 while not is_app_closed(): quit_app() self.wait(1) def close_tab(self): call(['osascript', '-e', 'tell application "System Events" to keystroke "w" using command down']) def wait(self, duration): wait_start = time.time() while time.time() - wait_start < duration: pass def log(self, message): if self._feedback_in_browser: self._feedback_server.send_message(message) sys.stdout.write(message) sys.stdout.flush() def log_verbose(self, message): if self._verbose: self.log(message) def run(self): self._parse_args() self._setup_servers() self.quit_browser() print '' try: group_means = [] if self._save_results_to_json: resultsDict = {self.get_test_name(): {"metrics": {"Time": {"current": []}}}} results_by_iteration_number = [[] for _ in range(self._iterations)] group = 1 for wait_duration in self._wait_times(): self.group_init() if self.iteration_groups > 1: self.log('Running group {}{}'.format(group, ':\n' if self._verbose else '...')) results = [] for i in range(self._iterations): try: if not self._verbose: self.log('.') result_in_ms = self.run_iteration() self.log_verbose('({}) {} ms\n'.format(i + 1, result_in_ms)) self.wait(wait_duration) results.append(result_in_ms) results_by_iteration_number[i].append(result_in_ms) except KeyboardInterrupt: raise KeyboardInterrupt except Exception as error: self._exit_due_to_exception('(Test {} failed) {}: {}\n'.format(i + 1 if self._verbose else i, type(error).__name__, error)) if not self._verbose: print '' if self._save_results_to_json: resultsDict[self.get_test_name()]["metrics"]["Time"]["current"].append(results) mean, stdev = self._compute_results(results) self.log_verbose('RESULTS:\n') self.log_verbose('mean: {} ms\n'.format(mean)) self.log_verbose('std dev: {} ms ({}%)\n\n'.format(stdev, (stdev / mean) * 100)) if self._verbose: self.wait(1) group_means.append(mean) group += 1 self.quit_browser() if not self._verbose: print '\n' if self._feedback_in_browser: self.launch_browser() if self._save_results_to_json and self._json_results_path: with open(self._json_results_path, "w") as jsonFile: json.dump(resultsDict, jsonFile, indent=4, separators=(',', ': ')) means_by_iteration_number = [] if len(results_by_iteration_number) > 1 and not self._do_not_ignore_first_result: results_by_iteration_number = results_by_iteration_number[1:] for iteration_results in results_by_iteration_number: means_by_iteration_number.append(self._geometric_mean(iteration_results)) final_mean = self._geometric_mean(group_means) final_stdev = self._standard_deviation(means_by_iteration_number) self.log('FINAL RESULTS\n') self.log('Mean:\n-> {} ms\n'.format(final_mean)) self.log('Standard Deviation:\n-> {} ms ({}%)\n'.format(final_stdev, (final_stdev / final_mean) * 100)) except KeyboardInterrupt: self._clean_up() sys.exit(1) finally: self._clean_up() def group_init(self): pass def run_iteration(self): pass def initialize(self): pass def will_parse_arguments(self): pass def did_parse_arguments(self, args): pass def get_test_name(self): return "LaunchTimeBenchmark"