import re from pathlib import Path from io import BytesIO from zipfile import BadZipfile from datetime import datetime from calendar import monthrange import requests from openpyxl import load_workbook import pycountry from mirri.io.parsers.excel import workbook_sheet_reader, get_all_cell_data_from_sheet from mirri.validation.error_logging import ErrorLog, Error from mirri.validation.tags import (CHOICES, COLUMNS, COORDINATES, CROSSREF, CROSSREF_NAME, DATE, ERROR_CODE, FIELD, MANDATORY, MATCH, MISSING, MULTIPLE, NAGOYA, REGEXP, ROW_VALIDATION, SEPARATOR, TAXON, TYPE, UNIQUE, VALIDATION, VALUES, BIBLIO, DOMINIO,URL_DOMINIO, ISO, URL_TITLE,JUST_URL,TITLE, HISTORY,NAGOYA1, VERSION) from mirri.settings import LOCATIONS, SUBTAXAS from mirri.validation.validation_conf_12052023 import version_config from mirri.validation.validation_conf_12052023 import MIRRI_12052023_VALLIDATION_CONF def validate_mirri_excel(fhand, version= "5.1.2" ): if version == "5.1.2": configuration = MIRRI_12052023_VALLIDATION_CONF else: raise NotImplementedError("Only version 5.1.2 is implemented") return validate_excel(fhand, configuration) def version(value , validation_conf=None): if value is None: return True try: for version in version_config: if value == version : return True except: return False def validate_country_code(value,validation_conf=None): if value is None: return True try: if pycountry.countries.get(alpha_2=value) or pycountry.countries.get(alpha_3=value) or pycountry.historic_countries.get(alpha_4 = value): return True except: return False def validate_excel(fhand, configuration): validation_conf = configuration['sheet_schema'] cross_ref_conf = configuration['cross_ref_conf'] in_memory_sheet_conf = configuration['keep_sheets_in_memory'] excel_name = Path(fhand.name).stem error_log = ErrorLog(excel_name) try: workbook = load_workbook(filename=BytesIO( fhand.read()), read_only=True, data_only=True) except (BadZipfile, IOError): error = Error('EXL00', fhand.name, fhand.name) error_log.add_error(error) return error_log # excel structure errors structure_errors = list(validate_excel_structure(workbook, validation_conf)) if structure_errors: for error in structure_errors: error = Error(error[ERROR_CODE], pk=error['id'], data=error['value']) error_log.add_error(error) return error_log crossrefs = get_all_crossrefs(workbook, cross_ref_conf) in_memory_sheets = get_all_in_memory_sheet(workbook, in_memory_sheet_conf) content_errors = validate_content(workbook, validation_conf, crossrefs, in_memory_sheets) for error in content_errors: # if error[ERROR_CODE] == 'STD43': # continue error = Error(error[ERROR_CODE], pk=error['id'], data=error['value']) error_log.add_error(error) return error_log def validate_excel_structure(workbook, validation_conf): for sheet_name, sheet_conf in validation_conf.items(): mandatory = sheet_conf.get(VALIDATION, {}).get(TYPE, None) mandatory = mandatory == MANDATORY error_code = sheet_conf.get(VALIDATION, {}).get(ERROR_CODE, False) try: sheet = workbook[sheet_name] except KeyError: sheet = None if sheet is None: if mandatory: yield {'id': None, 'sheet': sheet_name, 'field': None, 'error_code': error_code, 'value': None} continue headers = _get_sheet_headers(sheet) for column in sheet_conf.get(COLUMNS): field = column[FIELD] for step in column.get(VALIDATION, []): if step[TYPE] == MANDATORY and field not in headers: yield {'id': None, 'sheet': sheet_name, 'field': field, 'error_code': step[ERROR_CODE], 'value': None} def _get_sheet_headers(sheet): first_row = next(sheet.iter_rows(min_row=1, max_row=1)) return [c.value for c in first_row] def _get_values_from_columns(workbook, sheet_name, columns): indexed_values = {} for row in workbook_sheet_reader(workbook, sheet_name): for col in columns: indexed_values[str(row.get(col))] = "" return indexed_values def get_all_crossrefs(workbook, cross_refs_names): crossrefs = {} for ref_name, columns in cross_refs_names.items(): if columns: crossrefs[ref_name] = _get_values_from_columns(workbook, ref_name, columns) else: try: crossrefs[ref_name] = get_all_cell_data_from_sheet(workbook, ref_name) except ValueError as error: if 'sheet is missing' in str(error): crossrefs[ref_name] = [] else: raise return crossrefs def get_all_in_memory_sheet(workbook, in_memory_sheet_conf): in_memory_sheets = {} for sheet_conf in in_memory_sheet_conf: sheet_name = sheet_conf['sheet_name'] indexed_by = sheet_conf['indexed_by'] rows = workbook_sheet_reader(workbook, sheet_name) indexed_rows = {row[indexed_by]: row for row in rows} in_memory_sheets[sheet_name] = indexed_rows return in_memory_sheets def validate_content(workbook, validation_conf, crossrefs, in_memory_sheets): for sheet_name in validation_conf.keys(): sheet_conf = validation_conf[sheet_name] sheet_id_column = sheet_conf['id_field'] shown_values = {} row_validation_steps = sheet_conf.get(ROW_VALIDATION, None) for row in workbook_sheet_reader(workbook, sheet_name): id_ = row.get(sheet_id_column, None) if id_ is None: error_code = _get_missing_row_id_error(sheet_id_column, sheet_conf) yield {'id': id_, 'sheet': sheet_name, 'field': sheet_id_column, 'error_code': error_code, 'value': None} continue do_have_cell_error = False for column in sheet_conf[COLUMNS]: label = column[FIELD] validation_steps = column.get(VALIDATION, None) value = row.get(label, None) if validation_steps: error_code = validate_cell(value, validation_steps, crossrefs, shown_values, label) if error_code is not None: do_have_cell_error = True yield {'id': id_, 'sheet': sheet_name, 'field': label, 'error_code': error_code, 'value': value} if not do_have_cell_error and row_validation_steps: error_code = validate_row( row, row_validation_steps, in_memory_sheets) if error_code is not None: yield {'id': id_, 'sheet': sheet_name, 'field': 'row', 'error_code': error_code, 'value': 'row'} def _get_missing_row_id_error(sheet_id_column, sheet_conf): error_code = None for column in sheet_conf[COLUMNS]: if column[FIELD] == sheet_id_column: error_code = [step[ERROR_CODE] for step in column[VALIDATION] if step[TYPE] == MISSING][0] return error_code def validate_row(row, validation_steps, in_memory_sheets): for validation_step in validation_steps: kind = validation_step[TYPE] error_code = validation_step[ERROR_CODE] if kind == NAGOYA: if not is_valid_nagoya_v12052023(row, in_memory_sheets): return error_code elif kind == BIBLIO: if not is_valid_pub(row): return error_code elif kind == NAGOYA1: if not is_valid_nago(row): return error_code else: msg = f'{kind} is not a recognized row validation type method' raise NotImplementedError(msg) def validate_cell(value, validation_steps, crossrefs, shown_values, label): for step_conf in validation_steps: if step_conf[TYPE] == MANDATORY: continue step_conf['crossrefs_pointer'] = crossrefs step_conf['shown_values'] = shown_values step_conf['label'] = label error_code = validate_value(value, step_conf) if error_code is not None: return error_code def is_valid_pub(row): pub_id = row.get('ID', None) pub_pmid = row.get('PMID', None) pub_doi = row.get('DOI', None) title = row.get('Title', None) full_reference = row.get('Full reference', None) authors = row.get('Authors', None) journal = row.get('Journal', None) year = row.get('Year', None) volumen = row.get('Volume', None) first_page = row.get('First page', None) book_title = row.get('Book title', None) editors = row.get('Editors', None) publishers = row.get('Publishers', None) if (pub_id != None and pub_doi != None) or (pub_id != None and pub_pmid != None) or (pub_id != None and full_reference != None) or (pub_id != None and authors != None and title != None and journal != None and year != None and volumen != None and first_page != None) : return True is_journal = bool(title) # if (is_journal and (not authors or not journal or not not year or # not volumen or not first_page)): # return False #if (not is_journal and (not authors or not year or # not editors or not publishers or not book_title)): # return False return False def is_valid_nago(row): if not row: return True status = row.get("status", None) type = row.get("type", None) regex = r'^[a-zA-Z\s.\'-]+$' if status != None and type != None: if (re.match(regex, status) and type==1): return False if (type == 2 and status is None): return False return True def parsee_mirri_excel(row, in_memory_sheets, version=""): if version == "12052023": return is_valid_nagoya_v12052023 (row, in_memory_sheets) else: raise NotImplementedError("Only version is implemented") def is_valid_nagoya_v12052023(row, in_memory_sheets): # sourcery skip: return-identity location_index = row.get('geographicOrigin', None) if location_index is None: country = None else: geo_origin = in_memory_sheets[LOCATIONS].get(location_index, {}) country = geo_origin.get('Country', None) _date = row.get("collectionDate", None) if _date is None: _date = row.get("isolationDate", None) if _date is None: _date = row.get("depositDate", None) if _date is None: _date = row.get("accessionDate", None) if _date is not None: year = _date.year if isinstance(_date, datetime) else int(str(_date)[:4]) else: year = None if year is not None and year >= 2014 and country is None: return False return True def is_valid_regex(value, validation_conf): if value is None: return True value = str(value) regexp = validation_conf[MATCH] multiple = validation_conf.get(MULTIPLE, False) separator = validation_conf.get(SEPARATOR, None) values = [v.strip() for v in value.split( separator)] if multiple else [value] for value in values: matches_regexp = re.fullmatch(regexp, value) if not matches_regexp: return False return True def is_valid_crossrefs(value, validation_conf): crossref_name = validation_conf[CROSSREF_NAME] crossrefs = validation_conf['crossrefs_pointer'] choices = crossrefs[crossref_name] if value is None or not choices: return True value = str(value) multiple = validation_conf.get(MULTIPLE, False) separator = validation_conf.get(SEPARATOR, None) if value is None: return True if multiple: values = [v.strip() for v in value.split(separator)] else: values = [value.strip()] return all(value in choices for value in values) def is_valid_choices(value, validation_conf): if value is None: return True choices = validation_conf[VALUES] multiple = validation_conf.get(MULTIPLE, False) separator = validation_conf.get(SEPARATOR, None) if multiple: values = [v.strip() for v in str(value).split(separator)] else: values = [str(value).strip()] sorted_values = sorted(values) if sorted_values != values: return False return all(value in choices for value in values) def is_valid_date(value, validation_conf): if value is None: return True if isinstance(value, datetime): year = value.year month = value.month day = value.day elif isinstance(value, int): year = value month = None day = None elif isinstance(value, str): value = value.replace('-', '') value = value.replace('/', '') month = None day = None try: year = int(value[: 4]) if len(value) >= 6: month = int(value[4: 6]) if len(value) >= 8: day = int(value[6: 8]) except (IndexError, TypeError, ValueError): return False else: return False if year < 1700 or year > datetime.now().year: return False if month is not None: if month < 1 or month > 13: return False if day is not None and (day < 1 or day > monthrange(year, month)[1]): return False return True def is_valid_dominio(value, validation_conf=None): if value is None: return True try: items = [i.strip() for i in value.split(";")] if len(items) >1: for i in range(0, len(items),2): nameSite = str(items[i]) urlSite = str(items[i+1]) dominio = urlSite.split(".")[-2] if nameSite.lower() != dominio: return False return True except: return False def is_valid_title(value, validation_conf=None): if value is None: return True try: items = [i.strip() for i in value.split(";")] if len(items) >1: for i in range(0, len(items),2): nameSite = (items[i]) urlSite = str(items[i+1]) regex = r'^(http|https):\/\/[a-z0-9\-\.]+\.[a-z]{2,}([/a-z0-9\-\.]*)*$' if re.match(regex, nameSite) or isinstance(nameSite, int) or nameSite == '': return False return True except: return False def is_valid_url_title(value, validation_conf=None): if value is None: return True try: items = [i.strip() for i in value.split(";")] if len(items) ==1: urlSite = str(items[0]) response = requests.head(urlSite) if response.status_code != 200: return False else: items = [i.strip() for i in value.split(";")] for i in range(0, len(items),2): nameSite = (items[i]) urlSite = str(items[i+1]) response = requests.head(urlSite) if response.status_code != 200: return False return True except: return False def is_valid_url_dominio(value, validation_conf=None): if value is None: return True try: items = [i.strip() for i in value.split(";")] for i in range(0, len(items),2): nameSite = str(items[i]) urlSite = str(items[i+1]) response = requests.head(urlSite) if response.status_code != 200: return False return True except: return False def is_valid_just_url(value, validation_conf=None): if value is None: return True try: items = [i.strip() for i in value.split(";")] for i in items: nameSite = str(items[0]) response = requests.head(i) if response.status_code != 200: return False return True except: return False def is_valid_history(value, validation_conf=None): if value is None: return True try: items = [i.strip() for i in value.split("<")] for i in items: regex1 = r'^[a-zA-Z0-9 &,;.:''-]+,?\s*((19|20)\d{2})' regex2 = r'^[a-zA-Z0-9 &,;.:''-]+,?\s*[a-zA-Z0-9 &,;.''-] (19|20)\d{2}\s\([a-zA-Z0-9 &,;.''-:]+\)' regex3 = r'^[a-zA-Z0-9 &,;.:''-]+\,?\s*[a-zA-Z0-9 &,;.''-]' regex4 = r'^[a-zA-Z0-9 &,;.''-]+,?\s*(19|20)\d{2}\s\([a-zA-Z0-9 .''-,;&:]+\)' regex5 = r'^[a-zA-Z0-9 &,;.:''-]+,?\s*\([a-zA-Z0-9 &,;.''-:]+\) (19|20)\d{2}' if re.match(regex1, i): return True elif re.match(regex2, i): return True elif re.match(regex3, i): return True elif re.match(regex4, i): return True elif re.match(regex5, i): return True else: return False except: return False def is_valid_coords(value, validation_conf=None): if value is None: return True try: regex1 = r'^-?(90(\.0+)?|[1-8]?\d(\.\d+)?)(\s*;\s*-?(180(\.0+)?|((1[0-7]\d)|(\d{1,2}))(\.\d+)?))*$' regex2 = r'^-?(90(\.0+)?|[1-8]?\d(\.\d+)?)\s*;\s*-?(180(\.0+)?|((1[0-7]\d)|(\d{1,2}))(\.\d+)?)\s*;\s*(\d+\.\d+|\?)\s*;\s*(\d+\.\d+|\?)$|^(\d+\.\d+|\?)$|^\s*;\s*$' if not re.match(regex1, value) and not re.match(regex2, value): return False return True except: return False def is_valid_missing(value, validation_conf=None): return value is not None def is_valid_taxon(value, validation_conf=None): multiple = validation_conf.get(MULTIPLE, False) separator = validation_conf.get(SEPARATOR, ';') value = value.split(separator) if multiple else [value] for taxon in value: taxon = taxon.strip() if not _is_valid_taxon(taxon): return False return True def _is_valid_taxon(value): value = value.strip() if not value: return True items = re.split(r" +", value) genus = items[0] if len(items) > 1: species = items[1] if species in ("sp", "spp", ".sp", "sp."): return False if len(items) > 2: for index in range(0, len(items[2:]), 2): rank = SUBTAXAS.get(items[index + 2], None) if rank is None: print(value) return False return True def is_valid_unique(value, validation_conf): if not value: return True label = validation_conf['label'] shown_values = validation_conf['shown_values'] if label not in shown_values: shown_values[label] = {} already_in_file = shown_values[label] if value in already_in_file: return False # NOTE: what's the use of this? # What is the expected format for value and shown_values? shown_values[label][value] = None return True def is_valid_file(path): try: with path.open("rb") as fhand: error_log = validate_mirri_excel(fhand) if "EXL" in error_log.get_errors(): return False except: return False return True VALIDATION_FUNCTIONS = { MISSING: is_valid_missing, REGEXP: is_valid_regex, CHOICES: is_valid_choices, CROSSREF: is_valid_crossrefs, DATE: is_valid_date, COORDINATES: is_valid_coords, TAXON: is_valid_taxon, TITLE: is_valid_title, DOMINIO: is_valid_dominio, URL_TITLE: is_valid_url_title, URL_DOMINIO: is_valid_url_dominio, JUST_URL: is_valid_just_url, ISO: validate_country_code, HISTORY: is_valid_history, VERSION: version, UNIQUE: is_valid_unique} def validate_value(value, step_conf): kind = step_conf[TYPE] try: is_value_valid = VALIDATION_FUNCTIONS[kind] except KeyError: msg = f'This validation type {kind} is not implemented' raise NotImplementedError(msg) error_code = step_conf[ERROR_CODE] if not is_value_valid(value, step_conf): return error_code