# GNAThub (GNATdashboard)
# Copyright (C) 2013-2020, AdaCore
#
# This is free software; you can redistribute it and/or modify it under
# terms of the GNU General Public License as published by the Free Soft-
# ware Foundation; either version 3, or (at your option) any later ver-
# sion. This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHAN-
# TABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
# License for more details. You should have received a copy of the GNU
# General Public License distributed with this software; see file
# COPYING3. If not, go to http://www.gnu.org/licenses for a complete copy
# of the license.
"""GNAThub plug-in for the CodePeer command-line tool.
It exports the CodePeer class which implements the :class:`GNAThub.Plugin`
interface. This allows GNAThub's plug-in scanner to automatically find this
module and load it as part of the GNAThub default execution.
"""
import collections
import csv
import json
import os
import os.path
import subprocess
import re
import json
from shutil import which
import GNAThub
from GNAThub import Console, Plugin, Reporter, Runner, ToolArgsPlaceholder
import _codepeer
[docs]class CodePeer(Plugin, Runner, Reporter):
"""CodePeer plugin for GNAThub.
Configures and executes CodePeer, then analyzes the output.
"""
CODEPEER_TO_RANKING = {
"annotation": GNAThub.RANKING_ANNOTATION,
"info": GNAThub.RANKING_INFO,
"low": GNAThub.RANKING_LOW,
"medium": GNAThub.RANKING_MEDIUM,
"high": GNAThub.RANKING_HIGH,
}
@property
def output_dir(self):
"""Return the path to the directory where to generate the Codepeer files
:return: the full path to the output directory
:rtype: str
"""
# Note: if you change something here, propagate the change to codepeer_obj_dir
# in html_report.py too.
suffix = "gnatsas" if _codepeer.version == _codepeer.GNATSAS else "codepeer"
return os.path.join(GNAThub.Project.artifacts_dir(), suffix)
[docs] def __init__(self):
super(CodePeer, self).__init__()
if GNAThub.dry_run_without_project():
return
self.tool = None
self.ext = "csv" if _codepeer.version == _codepeer.LEGACY else "json"
self.report_file = os.path.join(
self.output_dir,
"{}.{}".format(GNAThub.Project.name().lower(), self.ext),
)
# Map of rules (couple (name, rule): dict[str,Rule])
self.rules = {}
# Map of messages (couple (rule, message): dict[str,Message])
self.messages = {}
# Map of bulk data (couple (source, message_data): dict[str,list])
self.bulk_data = collections.defaultdict(list)
@staticmethod
def __cmd_line():
"""Create CodePeer command line arguments list.
:return: the CodePeer command line
:rtype: collections.Iterable[str]
"""
if _codepeer.version == _codepeer.GNATSAS:
binary = ["gnatsas", "analyze", ToolArgsPlaceholder("gnatsas")]
elif _codepeer.version == _codepeer.CPL:
binary = ["codepeer", ToolArgsPlaceholder("codepeer")]
else:
# Emulate old behavior, do not pass CodePeer options here.
# Unclear it's the right approach
binary = ["codepeer"]
cmd_line = binary + ["-P", GNAThub.Project.path(), "-j%d" % GNAThub.jobs()]
if GNAThub.u_process_all():
cmd_line.extend(["-U"])
# Keeping this for -U main switch implementation
# if GNAThub.u_main():
# cmd_line.extend(['-U'])
# cmd_line.extend([GNAThub.u_main()])
if GNAThub.subdirs():
cmd_line.extend(["--subdirs=" + GNAThub.subdirs()])
return cmd_line + GNAThub.Project.scenario_switches()
@staticmethod
def __report_cmd_line(report):
"""Create the CodePeer command-line to extract the report
:return: the command line
:rtype: collections.Iterable[str]
"""
if _codepeer.version == _codepeer.GNATSAS:
binary = ["gnatsas", "report", "gnathub"]
else:
binary = ["codepeer"]
if _codepeer.version == _codepeer.GNATSAS:
out = ["-o", report]
elif _codepeer.version == _codepeer.CPL:
out = ["--no-analysis", "--gnathub", "--out", report]
else:
out = ["-output-msg-only", "-csv", "-csv-out", report]
cmd = binary + ["-P", GNAThub.Project.path()]
# Extend previous (and slightly odd) behavior
if _codepeer.version == _codepeer.GNATSAS:
cmd.extend([ToolArgsPlaceholder("gnatsas")])
else:
cmd.extend([ToolArgsPlaceholder("codepeer")])
if GNAThub.subdirs():
cmd.extend(["--subdirs=" + GNAThub.subdirs()])
cmd.extend(GNAThub.Project.scenario_switches())
if _codepeer.version == _codepeer.GNATSAS:
cmd.extend([ToolArgsPlaceholder("gnatsas-report")])
else:
cmd.extend([ToolArgsPlaceholder("codepeer_msg_reader")])
# Must come last in case the user has set it too.
cmd.extend(out)
return cmd
[docs] def run(self):
"""Execute CodePeer.
Sets the exec_status property according to the success of the
execution of the tool:
* ``GNAThub.EXEC_SUCCESS``: on successful execution
* ``GNAThub.EXEC_FAILURE``: on any error
"""
return (
GNAThub.EXEC_SUCCESS
if GNAThub.Run(self.name, self.__cmd_line()).status == 0
else GNAThub.EXEC_FAILURE
)
[docs] def report(self):
"""Execute CodePeer message reader and parses the output.
Sets the exec_status property according to the success of the analysis:
* ``GNAThub.EXEC_SUCCESS``: on successful execution and analysis
* ``GNAThub.EXEC_FAILURE``: on any error
"""
# Get codepeer version
dest = os.path.join(self.output_dir, "version.txt")
cmd = (
["gnatsas", "--version"]
if _codepeer.version == _codepeer.GNATSAS
else ["codepeer", "-v"]
)
output = str(subprocess.run(cmd, check=True, capture_output=True).stdout)
version = ""
try:
version = re.match(".* ([0-9]+\\.[0-9]+(w|rc)?) .*", output).groups()[0]
except Exception:
self.log.debug("Unrecognized version number %s", output)
version = output # fallback
self.info("Codepeer version put in %s", dest)
with open(dest, "w") as fd:
fd.write(version)
# Clear existing references only if not incremental run
if not GNAThub.incremental():
self.info("clear existing results if any")
GNAThub.Tool.clear_references(self.name)
self.info("extract results to %s" % self.report_file)
proc = GNAThub.Run(self.output_dir, self.__report_cmd_line(self.report_file))
if proc.status != 0:
return GNAThub.EXEC_FAILURE
self.info(f"analyse {self.ext} report from {self.report_file}")
self.tool = GNAThub.Tool(self.name)
self.log.debug("parse report: %s", self.report_file)
if not os.path.isfile(self.report_file):
self.error("no report found")
return GNAThub.EXEC_FAILURE
with open(self.report_file, "r") as report:
# Create the tag "New" for new CodePeer messages
added_tag = GNAThub.Property("codepeer:added", "Added")
removed_tag = GNAThub.Property("codepeer:removed", "Removed")
unchanged_tag = GNAThub.Property("codepeer:unchanged", "Unchanged")
try:
if _codepeer.version == _codepeer.LEGACY:
# Parse the file and drop the first line (containing the
# columns name).
output = [line for line in csv.reader(report, quotechar='"')]
# Drop the first line (containing the columns name)
header = output[0]
output = output[1:]
self.log.debug("drop header line: %s", header)
else:
# Parse the json file
output = json.loads(report.read())
total = len(output)
reviews = []
# Iterate over each relevant record
for index, record in enumerate(output, start=1):
self.log.debug("parse record: %r", record)
if _codepeer.version != _codepeer.LEGACY:
# We need an id for each message. Use the line number
message_id = str(index)
source = record["source"]
line = record["line"]
column = record["column"]
rule = record["rule_id"]
history = record["history"]
severity = record["severity"]
message = record["message"]
if record["review"] != "None":
review = record["review"][1]
review["identifier"] = message_id
reviews.append(review)
else:
# Each row is a list of strings:
#
# File, Line, Column, Category, History, Has_Review,
# Ranking, Kind, Message, Classification, CWE, Checks,
# Primary_Checks, Subp, Timestamp, Approved By, Comment,
# Message_Id, Status
(
source,
line,
column,
rule,
history,
has_review,
severity,
category,
message,
classification,
cwe,
checks,
pchecks,
subp,
timestamp,
app_by,
comment,
message_id,
status_category,
) = record[:19]
if not severity or severity == "suppressed":
# Some versions of codepeer report an empty severity
# for suppressed messages: map this to 'info'.
severity = "info"
rule_id = rule.lower()
self.__add_message(
source,
line,
column,
rule_id,
message,
severity,
message_id,
[
added_tag
if history == "added"
else (
removed_tag if history == "removed" else unchanged_tag
)
],
)
if index % 100 == 1 or index == total:
Console.progress(index, total, new_line=(index == total))
if _codepeer.version != _codepeer.LEGACY:
json_reviews = json.dumps(reviews, indent=2)
reviews_file = os.path.join(
self.output_dir, "codepeer_review_csv.json"
)
with open(reviews_file, "w") as outfile:
outfile.write(json_reviews)
except csv.Error as why:
self.log.exception(f"failed to parse {self.ext} report")
self.error("%s (%s:%d)" % (why, os.path.basename(self.report_file), index))
return GNAThub.EXEC_FAILURE
else:
self.__do_bulk_insert()
return GNAThub.EXEC_SUCCESS
def __get_ranking(self, severity):
"""Get corresponding ranking for a given severity
:param str severity: message severity string value
:param str ranking: corresponding integer value
"""
sev = severity.lower().replace(" ", "")
return self.CODEPEER_TO_RANKING.get(sev, GNAThub.RANKING_UNSPECIFIED)
def __add_message(
self, src, line, column, rule_id, msg, category, tool_msg_id, properties
):
"""Add CodePeer message to current session database.
:param str src: message source file
:param str line: message line number
:param str column: message column number
:param str rule_id: message rule identifier
:param str msg: description of the message
:param str category: the category of the message
:param str tool_msg_id: the original id of the message
:param properties: the message properties
:type properties: collections.Iterable[GNAThub.Property] or None
"""
# Get message ranking value
ranking = self.__get_ranking(category)
# Cache the rules
if rule_id in self.rules:
rule = self.rules[rule_id]
else:
rule = GNAThub.Rule(rule_id, rule_id, GNAThub.RULE_KIND, self.tool)
self.rules[rule_id] = rule
# Get message id from string
msg_id = 0
if tool_msg_id and tool_msg_id.strip().isdigit():
msg_id = int(tool_msg_id)
# Cache the messages
if (rule, msg, ranking, msg_id) in self.messages:
message = self.messages[(rule, msg, ranking, msg_id)]
else:
message = GNAThub.Message(rule, msg, ranking, msg_id, properties)
self.messages[(rule, msg, ranking, msg_id)] = message
# Add the message to the given resource
self.bulk_data[src].append([message, int(line), int(column), int(column)])
def __do_bulk_insert(self):
"""Insert the codepeer messages in bulk on each resource."""
# List of resource messages suitable for tool level bulk insertion
resources_messages = []
for src in self.bulk_data:
base = GNAThub.Project.source_file(os.path.basename(src))
resource = GNAThub.Resource.get(base)
if resource:
resources_messages.append([resource, self.bulk_data[src]])
self.tool.add_messages(resources_messages, [])