CCG_mirri_utils/mirri/io/parsers/mirri_excel.py

286 lines
11 KiB
Python

import re
from datetime import date
from io import BytesIO
import pycountry
from openpyxl import load_workbook
from mirri import rsetattr, ValidationError
from mirri.biolomics.serializers.sequence import GenomicSequenceBiolomics
from mirri.biolomics.serializers.strain import StrainMirri
from mirri.entities.growth_medium import GrowthMedium
from mirri.io.parsers.excel import workbook_sheet_reader
from mirri.entities.publication import Publication
from mirri.entities.date_range import DateRange
from mirri.entities.strain import OrganismType, StrainId, add_taxon_to_strain
from mirri.settings import (COMMERCIAL_USE_WITH_AGREEMENT, GENOMIC_INFO,
GROWTH_MEDIA, LITERATURE_SHEET, LOCATIONS,
MIRRI_FIELDS, NAGOYA_DOCS_AVAILABLE, NAGOYA_NO_RESTRICTIONS,
NAGOYA_PROBABLY_SCOPE, NO_RESTRICTION,
ONLY_RESEARCH, ONTOBIOTOPE,
PUBLICATION_FIELDS, STRAINS, SUBTAXAS)
from mirri.utils import get_country_from_name
RESTRICTION_USE_TRANSLATOR = {
1: NO_RESTRICTION,
2: ONLY_RESEARCH,
3: COMMERCIAL_USE_WITH_AGREEMENT,
}
NAGOYA_TRANSLATOR = {
1: NAGOYA_NO_RESTRICTIONS,
2: NAGOYA_DOCS_AVAILABLE,
3: NAGOYA_PROBABLY_SCOPE,
}
TRUEFALSE_TRANSLATOR = {
1: False,
2: True
}
def parse_mirri_excel(fhand, version=""):
if version == "5.1.2":
return _parse_mirri_v12052023(fhand)
else:
raise NotImplementedError("Only version is 5.1.2 implemented")
def _parse_mirri_v12052023(fhand):
fhand.seek(0)
file_content = BytesIO(fhand.read())
wb = load_workbook(filename=file_content, read_only=True, data_only=True)
locations = workbook_sheet_reader(wb, LOCATIONS)
ontobiotopes = workbook_sheet_reader(wb, ONTOBIOTOPE)
growth_media = list(parse_growth_media(wb))
markers = workbook_sheet_reader(wb, GENOMIC_INFO)
publications = list(parse_publications(wb))
strains = parse_strains(wb, locations=locations, growth_media=growth_media,
markers=markers, publications=publications,
ontobiotopes=ontobiotopes)
return {"strains": strains, "growth_media": growth_media}
def index_list_by(list_, id_):
return {str(item[id_]): item for item in list_}
def index_list_by_attr(list_, id_):
return {str(getattr(item, id_)): item for item in list_}
def index_markers(markers):
indexed_markers = {}
for marker in markers:
strain_id = marker["Strain AN"]
if strain_id not in indexed_markers:
indexed_markers[strain_id] = []
indexed_markers[strain_id].append(marker)
return indexed_markers
def remove_hard_lines(string=None):
if string is not None and string != '':
return re.sub(r'\r+\n+|\t+', '', string).strip()
else:
return None
def parse_growth_media(wb):
for row in workbook_sheet_reader(wb, GROWTH_MEDIA):
gm = GrowthMedium()
gm.acronym = str(row['Acronym'])
gm.description = row['Description']
gm.full_description = remove_hard_lines(row.get('Full description', None))
yield gm
def parse_publications(wb):
ids = []
for row in workbook_sheet_reader(wb, LITERATURE_SHEET):
pub = Publication()
for pub_field in PUBLICATION_FIELDS:
label = pub_field["label"]
col_val = row.get(label, None)
if col_val:
attribute = pub_field["attribute"]
setattr(pub, attribute, col_val)
yield pub
def parse_strains(wb, locations, growth_media, markers, publications,
ontobiotopes):
ontobiotopes_by_id = {str(ont["ID"]): ont['Name'] for ont in ontobiotopes}
ontobiotopes_by_name = {v: k for k, v in ontobiotopes_by_id.items()}
locations = index_list_by(locations, 'Locality')
growth_media = index_list_by_attr(growth_media, 'acronym')
publications = index_list_by_attr(publications, 'id')
markers = index_markers(markers)
for strain_row in workbook_sheet_reader(wb, STRAINS, "accessionNumber"):
strain = StrainMirri()
strain_id = None
label = None
for field in MIRRI_FIELDS:
label = field["label"]
attribute = field["attribute"]
value = strain_row[label]
if value is None or value == '':
continue
if attribute == "id":
collection, number = value.split(" ", 1)
value = StrainId(collection=collection, number=number)
rsetattr(strain, attribute, value)
elif attribute == "restriction_on_use":
rsetattr(strain, attribute, RESTRICTION_USE_TRANSLATOR[value])
elif attribute == "nagoya_protocol":
rsetattr(strain, attribute, NAGOYA_TRANSLATOR[value])
elif attribute == "other_numbers":
other_numbers = []
for on in value.split(";"):
on = on.strip()
try:
collection, number = on.split(" ", 1)
except ValueError:
collection = None
number = on
_id = StrainId(collection=collection, number=number)
other_numbers.append(_id)
rsetattr(strain, attribute, other_numbers)
elif attribute == "taxonomy.taxon_name":
try:
add_taxon_to_strain(strain, value)
except ValueError:
msg = f"The '{label}' for strain with Accession Number {strain_id} is not according to the specification."
raise ValidationError(msg)
elif attribute == "taxonomy.organism_type":
value = [OrganismType(val.strip())
for val in str(value).split(";")]
rsetattr(strain, attribute, value)
elif attribute in ("deposit.date", "collect.date", "isolation.date",
"catalog_inclusion_date"):
if isinstance(value, date):
value = DateRange(
year=value.year, month=value.month, day=value.day
)
elif isinstance(value, str):
value = DateRange().strpdate(value)
else:
raise NotImplementedError()
rsetattr(strain, attribute, value)
elif attribute == 'growth.recommended_temp':
temps = value.split(';')
if len(temps) == 1:
_min, _max = float(temps[0]), float(temps[0])
else:
_min, _max = float(temps[0]), float(temps[1])
rsetattr(strain, attribute, {'min': _min, 'max': _max})
elif attribute == "growth.recommended_media":
sep = "/"
if ";" in value:
sep = ";"
growth_media = [v.strip() for v in value.split(sep)]
rsetattr(strain, attribute, growth_media)
elif attribute == 'growth.tested_temp_range':
if value:
min_, max_ = value.split(";")
value = {'min': float(min_), 'max': float(max_)}
rsetattr(strain, attribute, value)
elif attribute == "form_of_supply":
rsetattr(strain, attribute, value.split(";"))
elif attribute == "collect.location.coords":
items = value.split(";")
strain.collect.location.latitude = float(items[0])
strain.collect.location.longitude = float(items[1])
strain.collect.location.precision = float(items[2])
strain.collect.location.altitude = float(items[3])
if len(items) > 4:
strain.collect.location.coord_uncertainty = items[4]
elif attribute == "collect.site.links":
items = value.split(";")
strain.collect.site.links.nameSite = str(items[0])
strain.collect.site.links.urlSite = str(items[1])
rsetattr(strain, attribute, value.split(";")) #ver o separador
if len(items) > 2:
strain.collect.site.links.site_uncertainty = items[2]
elif attribute == "collect.location":
location = locations[value]
if 'Country' in location and location['Country']:
if location['Country'] == 'Unknown':
continue
country_3 = _get_country_alpha3(location['Country'])
strain.collect.location.country = country_3
strain.collect.location.state = location["Region"]
strain.collect.location.municipality = location["City"]
strain.collect.location.site = location["Locality"]
elif attribute in ("abs_related_files", "mta_files"):
rsetattr(strain, attribute, value.split(";"))
elif attribute in ("is_from_registered_collection",
"is_subject_to_quarantine", 'taxonomy.interspecific_hybrid',
"is_potentially_harmful", "genetics.gmo"):
rsetattr(strain, attribute, TRUEFALSE_TRANSLATOR[value])
elif attribute == "publications":
value = str(value)
pubs = []
pub_ids = [v.strip() for v in str(value).split(";")]
for pub_id in pub_ids:
pub = publications.get(pub_id, None)
if pub is None:
pub = Publication()
if '/' in pub_id:
pub.doi = pub_id
else:
pub.pubmed_id = pub_id
pubs.append(pub)
rsetattr(strain, attribute, pubs)
elif attribute == 'ontobiotope':
values = []
for val in value.split(';'):
if val not in ontobiotopes_by_id:
val = ontobiotopes_by_name[val]
values.append(val)
rsetattr(strain, attribute, value)
elif attribute == 'other_denominations':
value = [v.strip() for v in value.split(';')]
rsetattr(strain, attribute, value)
elif attribute == 'genetics.plasmids':
value = [v.strip() for v in value.split(';')]
rsetattr(strain, attribute, value)
else:
#print(attribute, value, type(value))
rsetattr(strain, attribute, value)
# add markers
strain_id = strain.id.strain_id
if strain_id in markers:
for marker in markers[strain_id]:
_marker = GenomicSequenceBiolomics()
_marker.marker_id = marker["INSDC AN"]
_marker.marker_type = marker["Marker"]
_marker.marker_seq = marker["Sequence"]
strain.genetics.markers.append(_marker)
yield strain
def _get_country_alpha3(loc_country):
if loc_country == 'INW':
return loc_country
country = get_country_from_name(loc_country)
if not country:
country = pycountry.countries.get(alpha_3=loc_country)
if not country:
country = pycountry.historic_countries.get(alpha_3=loc_country)
country_3 = country.alpha_3
return country_3