blob: 7de1dd444e0c816f52f2ce65c039d3dd7c78f744 [file] [log] [blame]
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import pathlib
from typing import TextIO, List, Dict, Generator, Optional
from absl import logging
def _git_long(verb: str, *args, git_flag: Optional[str] = None, cwd=None, input=None) -> str:
logging.debug('Running\ngit %s %s\n with input: %s', verb, ' '.join(args), input)
command = ['git']
if git_flag:
command += [git_flag]
command += [verb]
command += list(args)
print(' '.join(command))
result = subprocess.run(command,
cwd=cwd, input=input,
text=True,
check = True,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE)
return result.stdout
def _git(verb: str, *args, git_flag: Optional[str] = None, cwd=None, input=None) -> str:
result = _git_long(verb, *args, git_flag=git_flag, cwd=cwd, input=input)
stdout = str(result)
logging.info('git %s stdout: %s', verb, stdout)
return stdout
class CommitMessage(object):
def __init__(self, commit_message: str) -> None:
self.commit_message = commit_message
class Commit(object):
def __init__(self, raw_commit: str) -> None:
self.raw_commit = raw_commit
class Git(object):
def __init__(self, git_dir: str) -> None:
self._git_dir = git_dir
def __call__(self, verb: str, *args) -> str:
return _git(verb, *args, cwd=self._git_dir)
def clone(self, remote, *args) -> str:
return _git('clone', *args, '--', remote, self._git_dir)
def am(self, patch_contents: str) -> str:
return _git('am', cwd=self._git_dir, input=patch_contents)
def push(self, remote_branch: str) -> str:
return _git('push', '-u', 'origin', remote_branch, cwd=self._git_dir)
def config(self, config: str, option: str) -> str:
return _git('config', '--local', config, option, cwd=self._git_dir)
def commit(self, *args) -> str:
return _git('commit', *args, cwd=self._git_dir)
def log(self, *args) -> str:
return _git_long('log', *args, git_flag='--no-pager', cwd=self._git_dir)
def log_no_page(self, commit_hash: str) -> CommitMessage:
return CommitMessage(_git('log', '-n 1', commit_hash, git_flag='--no-pager', cwd=self._git_dir))
def show(self, *args) -> Commit:
return Commit(_git('show', *args, git_flag='--no-pager', cwd=self._git_dir))
class BugFixChecker(object):
def bug_fix_log_grep(self) -> str:
return ''
def is_bug_fix(self, commit_message: CommitMessage) -> bool:
return False
class UpstreamKernelBugFixChecker(BugFixChecker):
def bug_fix_log_grep(self) -> str:
return 'Fixes:'
def is_bug_fix(self, commit_message: CommitMessage) -> bool:
match = re.search('^\s*Fixes: ([0-9a-fA-F]+) \(', commit_message.commit_message, re.MULTILINE)
return bool(match)
def find_bug_fixes(git: Git, bug_fix_checker: BugFixChecker) -> Generator[str, None, None]:
git_grep = '--grep=' + bug_fix_checker.bug_fix_log_grep()
for commit_hash in git.log(git_grep, '--pretty=format:%h').splitlines():
commit_message = git.log_no_page(commit_hash)
if bug_fix_checker.is_bug_fix(commit_message):
print('Found bug: ', commit_hash)
yield commit_hash
class PathToBugListMap(object):
def __init__(self) -> None:
self._path_to_bug_list_map = {}
def add_bug_to_path(self, path: str, bug_fix_hash: str) -> None:
self._path_to_bug_list_map.setdefault(path, []).append(bug_fix_hash)
def add_bug_to_paths(self, path_list: List[str], bug_fix_hash: str) -> None:
for path in path_list:
self.add_bug_to_path(path, bug_fix_hash)
def serialize(self, file_name) -> None:
with open(file_name, 'w') as file:
for path, bug_fix_hash_list in self._path_to_bug_list_map.items():
for bug_fix_hash in bug_fix_hash_list:
file.write(' '.join([path, bug_fix_hash, '\n']))
def path_to_bug_list_map_from_file(file_name: str) -> PathToBugListMap:
path_to_bug_list_map = PathToBugListMap()
with open(file_name, 'r') as file:
for line in file:
path, bug_fix_hash = line.split()
path_to_bug_list_map.add_bug_to_path(path, bug_fix_hash)
return path_to_bug_list_map
def find_affected_files_in_commit(commit: Commit) -> List[str]:
results = []
for match in re.finditer('^diff --git a\/\S+ b\/(\S+)$', commit.raw_commit, re.MULTILINE):
affected_file = match.group(1)
print('Found file: ', affected_file)
if os.path.isfile(affected_file):
results.append(affected_file)
return results
def build_path_to_bug_list_map(git: Git, bug_fix_checker: BugFixChecker) -> PathToBugListMap:
path_to_bug_lists = PathToBugListMap()
for commit_hash in find_bug_fixes(git, bug_fix_checker):
try:
commit = git.show(commit_hash)
paths = find_affected_files_in_commit(commit)
path_to_bug_lists.add_bug_to_paths(paths, commit_hash)
except UnicodeDecodeError as e:
logging.exception('Failed to show commit for %s, %s', commit_hash, e)
return path_to_bug_lists
def generate_path_to_bug_list(git: Git, path_to_bug_list_file: str, bug_fix_checker=UpstreamKernelBugFixChecker()) -> None:
path_to_bug_list_map = build_path_to_bug_list_map(git, bug_fix_checker)
path_to_bug_list_map.serialize(path_to_bug_list_file)
class BugFixFileTreeNode(object):
def get_bugs_of_children(self) -> List[str]:
return []
def get_number_of_child_bugs(self) -> int:
return len(self.get_bugs_of_children())
def sort(self) -> None:
pass
def print(self, indent: str = '') -> None:
pass
def add_path_to_bug_mapping(self, path: pathlib.Path, bug_hash: str) -> None:
pass
class BugFixFileNode(BugFixFileTreeNode):
def __init__(self, file_name: str, bug_list: Optional[List[str]] = None) -> None:
self.file_name = file_name
self._bug_list = bug_list or []
def get_bugs_of_children(self) -> List[str]:
return self._bug_list
def add_path_to_bug_mapping(self, path: pathlib.Path, bug_hash: str) -> None:
assert os.path.basename(path) == path and path == self.file_name
self._bug_list.append(bug_hash)
def sort(self) -> None:
self._bug_list.sort()
def print(self, indent: str = '') -> None:
print(indent, self.file_name, ': ', str(len(self._bug_list)))
#indent += '\t'
#for bug_hash in self._bug_list:
# print(indent, bug_hash)
class BugFixDirectoryNode(BugFixFileTreeNode):
def __init__(self,
directory_name: str,
child_directory_nodes: Optional[Dict[str, 'BugFixDirectoryNode']] = None,
child_file_nodes: Optional[Dict[str, BugFixFileNode]] = None) -> None:
self.directory_name = directory_name
self._child_directory_nodes = child_directory_nodes or {}
self._child_file_nodes = child_file_nodes or {}
self._child_nodes_sorted = []
self._number_of_child_bugs = 0
def get_bugs_of_children(self) -> List[str]:
results = []
for _, child in self._child_directory_nodes:
results += child.get_bugs_of_children
for _, child in self._child_file_nodes:
results += child.get_bugs_of_children
return results
def get_number_of_child_bugs(self) -> int:
return self._number_of_child_bugs
def add_path_to_bug_mapping(self, path: pathlib.Path, bug_hash: str) -> None:
child_name = path.parts[0]
if len(path.parts) > 1:
remaining_path = pathlib.Path(*path.parts[1:])
# print('Adding path: ' + str(path), ', Child name: ', child_name, ', Remaining path: ', remaining_path)
self._child_directory_nodes.setdefault(child_name, BugFixDirectoryNode(child_name)).add_path_to_bug_mapping(remaining_path, bug_hash)
# print(self._child_directory_nodes)
else:
self._child_file_nodes.setdefault(child_name, BugFixFileNode(child_name)).add_path_to_bug_mapping(child_name, bug_hash)
def sort(self) -> None:
#print('Keys for ', self.directory_name, ':')
#for part in list(self._child_directory_nodes.keys()) + list(self._child_file_nodes.keys()):
# print('\t', part)
self._child_nodes_sorted = [] + list(self._child_directory_nodes.values()) + list(self._child_file_nodes.values())
self._number_of_child_bugs = 0
for child in self._child_nodes_sorted:
child.sort()
self._number_of_child_bugs += child.get_number_of_child_bugs()
self._child_nodes_sorted.sort(key=lambda child: child.get_number_of_child_bugs(), reverse=True)
def print(self, indent: str = '') -> None:
print(indent, self.directory_name, ': ', str(self._number_of_child_bugs))
indent += '\t'
for child in self._child_nodes_sorted:
child.print(indent)
class BugFixFileTree(object):
def __init__(self) -> None:
self._root_directory_node = BugFixDirectoryNode('')
def build_from_path_to_bug_list_map(self, path_to_bug_list_map: PathToBugListMap) -> None:
for path, bug_list in path_to_bug_list_map._path_to_bug_list_map.items():
for bug_hash in bug_list:
self._root_directory_node.add_path_to_bug_mapping(pathlib.Path(path), bug_hash)
def print(self) -> None:
self._root_directory_node.sort()
self._root_directory_node.print()
def print_heat_map(git: Git, path_to_bug_list_file: str) -> None:
path_to_bug_list_map = path_to_bug_list_map_from_file(path_to_bug_list_file)
bug_fix_file_tree = BugFixFileTree()
bug_fix_file_tree.build_from_path_to_bug_list_map(path_to_bug_list_map)
bug_fix_file_tree.print()
def print_file_paths_with_bug_numbers(git: Git, path_to_bug_list_file: str) -> None:
path_to_bug_list_map = path_to_bug_list_map_from_file(path_to_bug_list_file)
path_to_bug_number = {}
for path, bug_list in path_to_bug_list_map._path_to_bug_list_map.items():
for bug_hash in bug_list:
path_to_bug_number.setdefault(path, 0)
path_to_bug_number[path] += 1
path_to_bug_number_list = list(path_to_bug_number.items())
path_to_bug_number_list.sort(key=lambda pair: pair[1], reverse=True)
for path_bug_number_pair in path_to_bug_number_list:
path = path_bug_number_pair[0]
bug_number = path_bug_number_pair[1]
print(path, ', ', bug_number)