463 lines
18 KiB
Python
463 lines
18 KiB
Python
import re
|
||
import sys
|
||
import pycountry
|
||
|
||
from mirri import rgetattr, rsetattr
|
||
from mirri.entities.date_range import DateRange
|
||
from mirri.entities.strain import ORG_TYPES, OrganismType, StrainId, StrainMirri, add_taxon_to_strain
|
||
from mirri.biolomics.remote.endoint_names import (GROWTH_MEDIUM_WS, TAXONOMY_WS,
|
||
ONTOBIOTOPE_WS, BIBLIOGRAPHY_WS, SEQUENCE_WS, COUNTRY_WS)
|
||
from mirri.settings import (
|
||
ALLOWED_FORMS_OF_SUPPLY,
|
||
NAGOYA_PROBABLY_SCOPE,
|
||
NAGOYA_NO_RESTRICTIONS,
|
||
NAGOYA_DOCS_AVAILABLE,
|
||
NO_RESTRICTION,
|
||
ONLY_RESEARCH,
|
||
COMMERCIAL_USE_WITH_AGREEMENT,
|
||
)
|
||
from mirri.biolomics.settings import MIRRI_FIELDS
|
||
from mirri.utils import get_pycountry
|
||
|
||
NAGOYA_TRANSLATOR = {
|
||
NAGOYA_NO_RESTRICTIONS: "no known restrictions under the Nagoya protocol",
|
||
NAGOYA_DOCS_AVAILABLE: "documents providing proof of legal access and terms of use available at the collection",
|
||
NAGOYA_PROBABLY_SCOPE: "strain probably in scope, please contact the culture collection",
|
||
}
|
||
REV_NAGOYA_TRANSLATOR = {v: k for k, v in NAGOYA_TRANSLATOR.items()}
|
||
|
||
RESTRICTION_USE_TRANSLATOR = {
|
||
NO_RESTRICTION: "no restriction apply",
|
||
ONLY_RESEARCH: "for research use only",
|
||
COMMERCIAL_USE_WITH_AGREEMENT: "for commercial development a special agreement is requested",
|
||
}
|
||
|
||
REV_RESTRICTION_USE_TRANSLATOR = {v: k for k,
|
||
v in RESTRICTION_USE_TRANSLATOR.items()}
|
||
|
||
DATE_TYPE_FIELDS = ("Date of collection", "Date of isolation",
|
||
"Date of deposit", "Date of inclusion in the catalogue")
|
||
BOOLEAN_TYPE_FIELDS = ("Strain from a registered collection", "Dual use",
|
||
"Quarantine in Europe", "Interspecific hybrid") # , 'GMO')
|
||
FILE_TYPE_FIELDS = ("MTA file", "ABS related files")
|
||
MAX_MIN_TYPE_FIELDS = ("Tested temperature growth range",
|
||
"Recommended growth temperature")
|
||
LIST_TYPES_TO_JOIN = ('Other denomination', 'Plasmids collections fields', 'Plasmids')
|
||
|
||
MARKER_TYPE_MAPPING = {
|
||
'16S rRNA': 'Sequences 16s', # or Sequences c16S rRNA
|
||
'ACT': 'Sequences ACT',
|
||
'CaM': 'Sequences CaM',
|
||
'EF-1α': 'Sequences TEF1a',
|
||
'ITS': 'Sequences ITS',
|
||
'LSU': 'Sequences LSU',
|
||
'RPB1': 'Sequences RPB1',
|
||
'RPB2': 'Sequences RPB2',
|
||
'TUBB': 'Sequences TUB' # or Sequences Beta tubulin
|
||
}
|
||
|
||
|
||
def serialize_to_biolomics(strain: StrainMirri, client=None, update=False,
|
||
log_fhand=None): # sourcery no-metrics
|
||
if log_fhand is None:
|
||
log_fhand = sys.stdout
|
||
strain_record_details = {}
|
||
|
||
for field in MIRRI_FIELDS:
|
||
try:
|
||
biolomics_field = field["biolomics"]["field"]
|
||
biolomics_type = field["biolomics"]["type"]
|
||
except KeyError:
|
||
# print(f'biolomics not configured: {field["label"]}')
|
||
continue
|
||
|
||
label = field["label"]
|
||
attribute = field["attribute"]
|
||
value = rgetattr(strain, attribute, None)
|
||
if value is None:
|
||
continue
|
||
|
||
if label == "Accession number":
|
||
value = f"{strain.id.collection} {strain.id.number}"
|
||
if label == "Restrictions on use":
|
||
value = RESTRICTION_USE_TRANSLATOR[value]
|
||
elif label == "Nagoya protocol restrictions and compliance conditions":
|
||
value = NAGOYA_TRANSLATOR[value]
|
||
elif label in FILE_TYPE_FIELDS:
|
||
value = [{"Name": "link", "Value": fname} for fname in value]
|
||
elif label == "Other culture collection numbers":
|
||
value = "; ".join(on.strain_id for on in value) if value else None
|
||
elif label in BOOLEAN_TYPE_FIELDS:
|
||
value = 'yes' if value else 'no'
|
||
elif label in 'GMO':
|
||
value = 'Yes' if value else 'No'
|
||
elif label == "Organism type":
|
||
org_types = [ot.name for ot in value]
|
||
value = []
|
||
for ot in ORG_TYPES.keys():
|
||
is_organism = "yes" if ot in org_types else "no"
|
||
value.append({"Name": ot, "Value": is_organism})
|
||
elif label == 'Taxon name':
|
||
if client:
|
||
taxa = strain.taxonomy.long_name.split(';')
|
||
value = []
|
||
for taxon_name in taxa:
|
||
taxon = get_remote_rlink(client, TAXONOMY_WS,
|
||
taxon_name)
|
||
if taxon:
|
||
value.append(taxon)
|
||
if not value:
|
||
msg = f'WARNING: {strain.taxonomy.long_name} not found in database'
|
||
log_fhand.write(msg + '\n')
|
||
# TODO: decide to raise or not if taxon not in MIRRI DB
|
||
#raise ValueError(msg)
|
||
|
||
elif label in DATE_TYPE_FIELDS:
|
||
year = value._year
|
||
month = value._month or 1
|
||
day = value._day or 1
|
||
if year is None:
|
||
continue
|
||
value = f"{year}-{month:02}-{day:02}"
|
||
elif label == 'History of deposit':
|
||
value = " < ".join(value)
|
||
elif label in MAX_MIN_TYPE_FIELDS:
|
||
if isinstance(value, (int, float, str)):
|
||
_max, _min = float(value), float(value)
|
||
else:
|
||
_max, _min = float(value['max']), float(value['min'])
|
||
|
||
content = {"MaxValue": _max, "MinValue": _min,
|
||
"FieldType": biolomics_type}
|
||
strain_record_details[biolomics_field] = content
|
||
continue
|
||
elif label in LIST_TYPES_TO_JOIN:
|
||
value = '; '.join(value)
|
||
# TODO: Check how to deal with crossrefs
|
||
elif label == "Recommended medium for growth":
|
||
if client is not None:
|
||
ref_value = []
|
||
for medium in value:
|
||
ws_gm = client.retrieve_by_name(GROWTH_MEDIUM_WS, medium)
|
||
if ws_gm is None:
|
||
raise ValueError(
|
||
f'Can not find the growth medium: {medium}')
|
||
gm = {"Name": {"Value": medium, "FieldType": "E"},
|
||
"RecordId": ws_gm.record_id}
|
||
ref_value.append(gm)
|
||
value = ref_value
|
||
else:
|
||
continue
|
||
|
||
elif label == "Form of supply":
|
||
_value = []
|
||
for form in ALLOWED_FORMS_OF_SUPPLY:
|
||
is_form = "yes" if form in value else "no"
|
||
_value.append({"Name": form, "Value": is_form})
|
||
value = _value
|
||
# print(label, value), biolomics_field
|
||
elif label == "Coordinates of geographic origin":
|
||
value = {'Latitude': strain.collect.location.latitude,
|
||
'Longitude': strain.collect.location.longitude}
|
||
precision = strain.collect.location.coord_uncertainty
|
||
if precision is not None:
|
||
value['Precision'] = precision
|
||
elif label == "Geographic origin":
|
||
if client is not None and value.country is not None:
|
||
country = get_pycountry(value.country)
|
||
if country is None:
|
||
log_fhand.write(f'WARNING: {value.country} Not a valida country code/name\n')
|
||
else:
|
||
_value = get_country_record(country, client)
|
||
if _value is None: # TODO: Remove this once the countries are added to the DB
|
||
msg = f'WARNING: {value.country} not in MIRRI DB'
|
||
log_fhand.write(msg + '\n')
|
||
#raise ValueError(msg)
|
||
else:
|
||
content = {"Value": [_value], "FieldType": "RLink"}
|
||
strain_record_details['Country'] = content
|
||
_value = []
|
||
for sector in ('state', 'municipality', 'site'):
|
||
sector_val = getattr(value, sector, None)
|
||
if sector_val:
|
||
_value.append(sector_val)
|
||
value = "; ".join(_value) if _value else None
|
||
if value is None:
|
||
continue
|
||
|
||
elif label == "Ontobiotope":
|
||
if client and value:
|
||
onto = get_remote_rlink(client, ONTOBIOTOPE_WS, value)
|
||
value = [onto] if onto is not None else None
|
||
elif label == 'Literature':
|
||
if client and value:
|
||
pub_rlinks = []
|
||
for pub in value:
|
||
rlink = get_remote_rlink(client, BIBLIOGRAPHY_WS, pub.title)
|
||
if rlink:
|
||
pub_rlinks.append(rlink)
|
||
if pub_rlinks:
|
||
value = pub_rlinks
|
||
else:
|
||
continue
|
||
|
||
elif label == '':
|
||
pass
|
||
|
||
elif label == 'Ploidy':
|
||
value = _translate_polidy(value)
|
||
if value is not None:
|
||
content = {"Value": value, "FieldType": biolomics_type}
|
||
strain_record_details[biolomics_field] = content
|
||
|
||
# if False:
|
||
# record_details["Data provided by"] = {
|
||
# "Value": strain.id.collection, "FieldType": "V"}
|
||
|
||
#Markers
|
||
if client:
|
||
add_markers_to_strain_details(client, strain, strain_record_details)
|
||
|
||
strain_structure = {"RecordDetails": strain_record_details}
|
||
if update:
|
||
strain_structure['RecordId'] = strain.record_id
|
||
strain_structure['RecordName'] = strain.record_name
|
||
else:
|
||
strain_structure["Acronym"] = "MIRRI"
|
||
|
||
return strain_structure
|
||
|
||
|
||
def add_markers_to_strain_details(client, strain: StrainMirri, details):
|
||
for marker in strain.genetics.markers:
|
||
marker_name = marker.marker_id
|
||
marker_in_ws = client.retrieve_by_name(SEQUENCE_WS, marker_name)
|
||
if marker_in_ws is None:
|
||
print('Marker not in web service')
|
||
continue
|
||
marker_type = marker.marker_type
|
||
ws_marker = {
|
||
"Value": [{
|
||
"Name": {"Value": marker_in_ws.record_name,
|
||
"FieldType": "E"},
|
||
"RecordId": marker_in_ws.record_id
|
||
}],
|
||
"FieldType": "NLink"
|
||
}
|
||
if marker_in_ws.marker_seq:
|
||
ws_marker['Value'][0]["TargetFieldValue"] = {
|
||
"Value": {"Sequence": marker_in_ws.marker_seq},
|
||
"FieldType": "N"
|
||
}
|
||
|
||
details[MARKER_TYPE_MAPPING[marker_type]] = ws_marker
|
||
|
||
|
||
def get_remote_rlink(client, endpoint, record_name):
|
||
entity = client.retrieve_by_name(endpoint, record_name)
|
||
if entity:
|
||
# some Endpoints does not serialize the json into a python object yet
|
||
try:
|
||
record_name = entity.record_name
|
||
record_id = entity.record_id
|
||
except AttributeError:
|
||
record_name = entity["RecordName"]
|
||
record_id = entity["RecordId"]
|
||
return {"Name": {"Value": record_name, "FieldType": "E"},
|
||
"RecordId": record_id}
|
||
|
||
|
||
def add_strain_rlink_to_entity(record, strain_id, strain_name):
|
||
field_strain = {
|
||
"FieldType": "RLink",
|
||
'Value': [{
|
||
'Name': {'Value': strain_name, 'FieldType': "E"},
|
||
'RecordId': strain_id
|
||
}]
|
||
}
|
||
record['RecordDetails']['Strains'] = field_strain
|
||
return record
|
||
|
||
|
||
PLOIDY_TRANSLATOR = {
|
||
0: 'Aneuploid',
|
||
1: 'Haploid',
|
||
2: 'Diploid',
|
||
3: 'Triploid',
|
||
4: 'Tetraploid',
|
||
9: 'Polyploid'
|
||
}
|
||
|
||
REV_PLOIDY_TRANSLATOR = {v: k for k, v in PLOIDY_TRANSLATOR.items()}
|
||
|
||
|
||
def _translate_polidy(ploidy):
|
||
# print('ploidy in serializer', ploidy)
|
||
try:
|
||
ploidy = int(ploidy)
|
||
except TypeError:
|
||
return '?'
|
||
try:
|
||
ploidy = PLOIDY_TRANSLATOR[ploidy]
|
||
except KeyError:
|
||
ploidy = 'Polyploid'
|
||
return ploidy
|
||
|
||
|
||
def serialize_from_biolomics(biolomics_strain, client=None): # sourcery no-metrics
|
||
strain = StrainMirri()
|
||
strain.record_id = biolomics_strain.get('RecordId', None)
|
||
strain.record_name = biolomics_strain.get('RecordName', None)
|
||
for field in MIRRI_FIELDS:
|
||
try:
|
||
biolomics_field = field["biolomics"]["field"]
|
||
except KeyError:
|
||
# print(f'biolomics not configured: {field["label"]}')
|
||
continue
|
||
|
||
label = field["label"]
|
||
attribute = field["attribute"]
|
||
field_data = biolomics_strain['RecordDetails'].get(biolomics_field, None)
|
||
if field_data is None:
|
||
continue
|
||
is_empty = field_data.get('IsEmpty')
|
||
if is_empty:
|
||
continue
|
||
if biolomics_field in ('Tested temperature growth range', 'Recommended growth temperature'):
|
||
value = {'max': field_data.get('MaxValue', None),
|
||
'min': field_data.get('MinValue', None)}
|
||
else:
|
||
value = field_data['Value']
|
||
# if value in (None, '', [], {}, '?', 'Unknown', 'nan', 'NaN'):
|
||
# continue
|
||
|
||
# print(label, attribute, biolomics_field, value)
|
||
|
||
if label == 'Accession number':
|
||
number = strain.record_name
|
||
mirri_id = StrainId(number=number)
|
||
strain.synonyms = [mirri_id]
|
||
coll, num = value.split(' ', 1)
|
||
accession_number_id = StrainId(collection=coll, number=num)
|
||
strain.id = accession_number_id
|
||
continue
|
||
elif label == "Restrictions on use":
|
||
value = REV_RESTRICTION_USE_TRANSLATOR[value]
|
||
elif label == 'Nagoya protocol restrictions and compliance conditions':
|
||
value = REV_NAGOYA_TRANSLATOR[value]
|
||
elif label in FILE_TYPE_FIELDS:
|
||
value = [f['Value'] for f in value]
|
||
elif label == "Other culture collection numbers":
|
||
other_numbers = []
|
||
for on in value.split(";"):
|
||
on = on.strip()
|
||
try:
|
||
collection, number = on.split(" ", 1)
|
||
except ValueError:
|
||
collection = None
|
||
number = on
|
||
_id = StrainId(collection=collection, number=number)
|
||
other_numbers.append(_id)
|
||
value = other_numbers
|
||
elif label in BOOLEAN_TYPE_FIELDS:
|
||
value = value == 'yes'
|
||
elif label == 'GMO':
|
||
value = value == 'Yes'
|
||
elif label == "Organism type":
|
||
organism_types = [OrganismType(item['Name']) for item in value if item['Value'] == 'yes']
|
||
if organism_types:
|
||
value = organism_types
|
||
elif label in 'Taxon name':
|
||
value = ";".join([v['Name']['Value'] for v in value])
|
||
add_taxon_to_strain(strain, value)
|
||
continue
|
||
|
||
elif label in DATE_TYPE_FIELDS:
|
||
# date_range = DateRange()
|
||
value = DateRange().strpdate(value)
|
||
|
||
elif label in ("Recommended growth temperature",
|
||
"Tested temperature growth range"):
|
||
if (value['max'] is None or value['max'] == 0 or
|
||
value['min'] is None and value['min'] == 0):
|
||
continue
|
||
elif label == "Recommended medium for growth":
|
||
value = [v['Name']['Value'] for v in value]
|
||
elif label == "Form of supply":
|
||
value = [item['Name'] for item in value if item['Value'] == 'yes']
|
||
elif label in LIST_TYPES_TO_JOIN:
|
||
value = [v.strip() for v in value.split(";")]
|
||
elif label == "Coordinates of geographic origin":
|
||
if ('Longitude' in value and 'Latitude' in value and
|
||
isinstance(value['Longitude'], float) and
|
||
isinstance(value['Latitude'], float)):
|
||
strain.collect.location.longitude = value['Longitude']
|
||
strain.collect.location.latitude = value['Latitude']
|
||
if value['Precision'] != 0:
|
||
strain.collect.location.coord_uncertainty = value['Precision']
|
||
continue
|
||
elif label == "Altitude of geographic origin":
|
||
value = float(value)
|
||
elif label == "Geographic origin":
|
||
strain.collect.location.site = value
|
||
continue
|
||
elif label == 'Ontobiotope':
|
||
try:
|
||
value = re.search("(OBT:[0-9]{5,7})", value[0]['Name']['Value']).group()
|
||
except (KeyError, IndexError, AttributeError):
|
||
continue
|
||
|
||
elif label == 'Ploidy':
|
||
value = REV_PLOIDY_TRANSLATOR[value]
|
||
elif label == 'Literature':
|
||
if client is not None:
|
||
pubs = []
|
||
for pub in value:
|
||
pub = client.retrieve_by_id(BIBLIOGRAPHY_WS, pub['RecordId'])
|
||
pubs.append(pub)
|
||
value = pubs
|
||
|
||
|
||
rsetattr(strain, attribute, value)
|
||
# fields that are not in MIRRI FIELD list
|
||
# country
|
||
if 'Country' in biolomics_strain['RecordDetails'] and biolomics_strain['RecordDetails']['Country']:
|
||
try:
|
||
country_name = biolomics_strain['RecordDetails']['Country']['Value'][0]['Name']['Value']
|
||
country = get_pycountry(country_name)
|
||
country_3 = country.alpha_3 if country else None
|
||
except (IndexError, KeyError):
|
||
country_3 = None
|
||
if country_3:
|
||
strain.collect.location.country = country_3
|
||
# Markers:
|
||
if client:
|
||
markers = []
|
||
for marker_type, biolomics_marker in MARKER_TYPE_MAPPING.items():
|
||
try:
|
||
marker_value = biolomics_strain['RecordDetails'][biolomics_marker]['Value']
|
||
except KeyError:
|
||
continue
|
||
if not marker_value:
|
||
continue
|
||
|
||
for marker in marker_value:
|
||
record_id = marker['RecordId']
|
||
marker = client.retrieve_by_id(SEQUENCE_WS, record_id)
|
||
if marker is not None:
|
||
markers.append(marker)
|
||
if markers:
|
||
strain.genetics.markers = markers
|
||
|
||
return strain
|
||
|
||
|
||
def get_country_record(country, client):
|
||
for attr in ('common_name', 'name', 'official_name'):
|
||
val = getattr(country, attr, None)
|
||
if val is not None:
|
||
_value = get_remote_rlink(client, COUNTRY_WS, val)
|
||
if _value is not None:
|
||
return _value
|
||
return None
|