672 lines
22 KiB
Python
672 lines
22 KiB
Python
import re
|
|
from pathlib import Path
|
|
from io import BytesIO
|
|
from zipfile import BadZipfile
|
|
from datetime import datetime
|
|
from calendar import monthrange
|
|
import requests
|
|
from openpyxl import load_workbook
|
|
import pycountry
|
|
|
|
from mirri.io.parsers.excel import workbook_sheet_reader, get_all_cell_data_from_sheet
|
|
from mirri.validation.error_logging import ErrorLog, Error
|
|
from mirri.validation.tags import (CHOICES, COLUMNS, COORDINATES, CROSSREF, CROSSREF_NAME, DATE,
|
|
ERROR_CODE, FIELD, MANDATORY, MATCH,
|
|
MISSING, MULTIPLE, NAGOYA, REGEXP, ROW_VALIDATION, SEPARATOR, TAXON,
|
|
TYPE, UNIQUE, VALIDATION, VALUES, BIBLIO, DOMINIO,URL_DOMINIO, ISO, URL_TITLE,JUST_URL,TITLE,
|
|
HISTORY,NAGOYA1, VERSION)
|
|
from mirri.settings import LOCATIONS, SUBTAXAS
|
|
from mirri.settings_v1 import LOCATIONS, SUBTAXAS
|
|
from mirri.validation.validation_conf_12052023 import version_config
|
|
|
|
|
|
def validate_mirri_excel(fhand, version="", date=""):
|
|
configuration = version_config.get(version)
|
|
if configuration is None:
|
|
raise NotImplementedError("Unsupported version: " + version)
|
|
configuration["date"] = date or configuration.get("date")
|
|
if configuration["date"] != "12/05/2023":
|
|
raise ValueError("Invalid date. Expected: 12/05/2023")
|
|
return validate_excel(fhand, configuration)
|
|
|
|
|
|
def version(value , validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
for version in version_config:
|
|
if value == version :
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def validate_country_code(value,validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
if pycountry.countries.get(alpha_2=value) or pycountry.countries.get(alpha_3=value) or pycountry.historic_countries.get(alpha_4 = value):
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def validate_excel(fhand, configuration):
|
|
validation_conf = configuration['sheet_schema']
|
|
cross_ref_conf = configuration['cross_ref_conf']
|
|
in_memory_sheet_conf = configuration['keep_sheets_in_memory']
|
|
excel_name = Path(fhand.name).stem
|
|
error_log = ErrorLog(excel_name)
|
|
|
|
try:
|
|
workbook = load_workbook(filename=BytesIO(
|
|
fhand.read()), read_only=True, data_only=True)
|
|
except (BadZipfile, IOError):
|
|
error = Error('EXL00', fhand.name, fhand.name)
|
|
error_log.add_error(error)
|
|
return error_log
|
|
|
|
# excel structure errors
|
|
structure_errors = list(validate_excel_structure(workbook, validation_conf))
|
|
if structure_errors:
|
|
for error in structure_errors:
|
|
error = Error(error[ERROR_CODE], pk=error['id'],
|
|
data=error['value'])
|
|
error_log.add_error(error)
|
|
|
|
return error_log
|
|
|
|
crossrefs = get_all_crossrefs(workbook, cross_ref_conf)
|
|
in_memory_sheets = get_all_in_memory_sheet(workbook, in_memory_sheet_conf)
|
|
content_errors = validate_content(workbook, validation_conf,
|
|
crossrefs, in_memory_sheets)
|
|
|
|
for error in content_errors:
|
|
# if error[ERROR_CODE] == 'STD43':
|
|
# continue
|
|
error = Error(error[ERROR_CODE], pk=error['id'], data=error['value'])
|
|
|
|
error_log.add_error(error)
|
|
return error_log
|
|
|
|
|
|
def validate_excel_structure(workbook, validation_conf):
|
|
for sheet_name, sheet_conf in validation_conf.items():
|
|
mandatory = sheet_conf.get(VALIDATION, {}).get(TYPE, None)
|
|
mandatory = mandatory == MANDATORY
|
|
|
|
error_code = sheet_conf.get(VALIDATION, {}).get(ERROR_CODE, False)
|
|
try:
|
|
sheet = workbook[sheet_name]
|
|
except KeyError:
|
|
sheet = None
|
|
|
|
if sheet is None:
|
|
if mandatory:
|
|
yield {'id': None, 'sheet': sheet_name, 'field': None,
|
|
'error_code': error_code, 'value': None}
|
|
continue
|
|
|
|
headers = _get_sheet_headers(sheet)
|
|
for column in sheet_conf.get(COLUMNS):
|
|
field = column[FIELD]
|
|
for step in column.get(VALIDATION, []):
|
|
if step[TYPE] == MANDATORY and field not in headers:
|
|
yield {'id': None, 'sheet': sheet_name, 'field': field,
|
|
'error_code': step[ERROR_CODE], 'value': None}
|
|
|
|
|
|
def _get_sheet_headers(sheet):
|
|
first_row = next(sheet.iter_rows(min_row=1, max_row=1))
|
|
return [c.value for c in first_row]
|
|
|
|
|
|
def _get_values_from_columns(workbook, sheet_name, columns):
|
|
indexed_values = {}
|
|
for row in workbook_sheet_reader(workbook, sheet_name):
|
|
for col in columns:
|
|
indexed_values[str(row.get(col))] = ""
|
|
|
|
return indexed_values
|
|
|
|
|
|
def get_all_crossrefs(workbook, cross_refs_names):
|
|
crossrefs = {}
|
|
for ref_name, columns in cross_refs_names.items():
|
|
if columns:
|
|
crossrefs[ref_name] = _get_values_from_columns(workbook, ref_name,
|
|
columns)
|
|
else:
|
|
try:
|
|
crossrefs[ref_name] = get_all_cell_data_from_sheet(workbook, ref_name)
|
|
except ValueError as error:
|
|
if 'sheet is missing' in str(error):
|
|
crossrefs[ref_name] = []
|
|
else:
|
|
raise
|
|
|
|
return crossrefs
|
|
|
|
|
|
def get_all_in_memory_sheet(workbook, in_memory_sheet_conf):
|
|
in_memory_sheets = {}
|
|
for sheet_conf in in_memory_sheet_conf:
|
|
sheet_name = sheet_conf['sheet_name']
|
|
indexed_by = sheet_conf['indexed_by']
|
|
rows = workbook_sheet_reader(workbook, sheet_name)
|
|
indexed_rows = {row[indexed_by]: row for row in rows}
|
|
in_memory_sheets[sheet_name] = indexed_rows
|
|
|
|
return in_memory_sheets
|
|
|
|
|
|
def validate_content(workbook, validation_conf, crossrefs, in_memory_sheets):
|
|
for sheet_name in validation_conf.keys():
|
|
sheet_conf = validation_conf[sheet_name]
|
|
sheet_id_column = sheet_conf['id_field']
|
|
shown_values = {}
|
|
row_validation_steps = sheet_conf.get(ROW_VALIDATION, None)
|
|
for row in workbook_sheet_reader(workbook, sheet_name):
|
|
id_ = row.get(sheet_id_column, None)
|
|
if id_ is None:
|
|
error_code = _get_missing_row_id_error(sheet_id_column,
|
|
sheet_conf)
|
|
yield {'id': id_, 'sheet': sheet_name,
|
|
'field': sheet_id_column,
|
|
'error_code': error_code, 'value': None}
|
|
continue
|
|
do_have_cell_error = False
|
|
for column in sheet_conf[COLUMNS]:
|
|
label = column[FIELD]
|
|
validation_steps = column.get(VALIDATION, None)
|
|
value = row.get(label, None)
|
|
if validation_steps:
|
|
error_code = validate_cell(value, validation_steps,
|
|
crossrefs, shown_values, label)
|
|
if error_code is not None:
|
|
do_have_cell_error = True
|
|
yield {'id': id_, 'sheet': sheet_name, 'field': label,
|
|
'error_code': error_code, 'value': value}
|
|
|
|
if not do_have_cell_error and row_validation_steps:
|
|
error_code = validate_row(
|
|
row, row_validation_steps, in_memory_sheets)
|
|
if error_code is not None:
|
|
yield {'id': id_, 'sheet': sheet_name, 'field': 'row',
|
|
'error_code': error_code, 'value': 'row'}
|
|
|
|
|
|
def _get_missing_row_id_error(sheet_id_column, sheet_conf):
|
|
error_code = None
|
|
for column in sheet_conf[COLUMNS]:
|
|
if column[FIELD] == sheet_id_column:
|
|
error_code = [step[ERROR_CODE]
|
|
for step in column[VALIDATION] if step[TYPE] == MISSING][0]
|
|
return error_code
|
|
|
|
|
|
def validate_row(row, validation_steps, in_memory_sheets):
|
|
for validation_step in validation_steps:
|
|
kind = validation_step[TYPE]
|
|
error_code = validation_step[ERROR_CODE]
|
|
if kind == NAGOYA:
|
|
if not is_valid_nagoya_v20200601(row, in_memory_sheets):
|
|
return error_code
|
|
if not is_valid_nagoya_v12052023(row, in_memory_sheets):
|
|
return error_code
|
|
elif kind == BIBLIO:
|
|
if not is_valid_pub(row):
|
|
return error_code
|
|
elif kind == NAGOYA1:
|
|
if not is_valid_nago(row):
|
|
return error_code
|
|
else:
|
|
msg = f'{kind} is not a recognized row validation type method'
|
|
raise NotImplementedError(msg)
|
|
|
|
|
|
def validate_cell(value, validation_steps, crossrefs, shown_values, label):
|
|
|
|
for step_conf in validation_steps:
|
|
if step_conf[TYPE] == MANDATORY:
|
|
continue
|
|
step_conf['crossrefs_pointer'] = crossrefs
|
|
step_conf['shown_values'] = shown_values
|
|
step_conf['label'] = label
|
|
error_code = validate_value(value, step_conf)
|
|
|
|
if error_code is not None:
|
|
return error_code
|
|
|
|
def is_valid_pub(row):
|
|
pub_id = row.get('ID', None)
|
|
pub_pmid = row.get('PMID', None)
|
|
pub_doi = row.get('DOI', None)
|
|
title = row.get('Title', None)
|
|
full_reference = row.get('Full reference', None)
|
|
authors = row.get('Authors', None)
|
|
journal = row.get('Journal', None)
|
|
year = row.get('Year', None)
|
|
volumen = row.get('Volume', None)
|
|
first_page = row.get('First page', None)
|
|
book_title = row.get('Book title', None)
|
|
editors = row.get('Editors', None)
|
|
publishers = row.get('Publishers', None)
|
|
|
|
if (pub_id != None and pub_doi != None) or (pub_id != None and pub_pmid != None) or (pub_id != None and full_reference != None) or (pub_id != None and authors != None and title != None and journal != None and year != None and volumen != None and first_page != None) :
|
|
return True
|
|
is_journal = bool(title)
|
|
|
|
# if (is_journal and (not authors or not journal or not not year or
|
|
# not volumen or not first_page)):
|
|
# return False
|
|
#if (not is_journal and (not authors or not year or
|
|
# not editors or not publishers or not book_title)):
|
|
# return False
|
|
|
|
return False
|
|
|
|
def is_valid_nago(row):
|
|
if not row:
|
|
return True
|
|
status = row.get("status", None)
|
|
type = row.get("type", None)
|
|
regex = r'^[a-zA-Z\s.\'-]+$'
|
|
|
|
if status != None and type != None:
|
|
if (re.match(regex, status) and type==1):
|
|
return False
|
|
if (type == 2 and status is None):
|
|
return False
|
|
return True
|
|
|
|
def parsee_mirri_excel(row, in_memory_sheets, version=""):
|
|
if version == "20200601":
|
|
return is_valid_nagoya_v20200601 (row, in_memory_sheets)
|
|
elif version == "12052023":
|
|
return is_valid_nagoya_v12052023 (row, in_memory_sheets)
|
|
else:
|
|
raise NotImplementedError("Only versions 20200601 and 12052023 are implemented")
|
|
|
|
def is_valid_nagoya_v20200601(row, in_memory_sheets): # sourcery skip: return-identity
|
|
location_index = row.get('Geographic origin', None)
|
|
if location_index is None:
|
|
country = None
|
|
else:
|
|
geo_origin = in_memory_sheets[LOCATIONS].get(location_index, {})
|
|
country = geo_origin.get('Country', None)
|
|
|
|
_date = row.get("Date of collection", None)
|
|
if _date is None:
|
|
_date = row.get("Date of isolation", None)
|
|
if _date is None:
|
|
_date = row.get("Date of deposit", None)
|
|
if _date is None:
|
|
_date = row.get("Date of inclusion in the catalogue", None)
|
|
if _date is not None:
|
|
year = _date.year if isinstance(_date, datetime) else int(str(_date)[:4])
|
|
else:
|
|
year = None
|
|
|
|
if year is not None and year >= 2014 and country is None:
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
def is_valid_nagoya_v12052023(row, in_memory_sheets): # sourcery skip: return-identity
|
|
location_index = row.get('geographicOrigin', None)
|
|
if location_index is None:
|
|
country = None
|
|
else:
|
|
geo_origin = in_memory_sheets[LOCATIONS].get(location_index, {})
|
|
country = geo_origin.get('Country', None)
|
|
|
|
_date = row.get("collectionDate", None)
|
|
if _date is None:
|
|
_date = row.get("isolationDate", None)
|
|
if _date is None:
|
|
_date = row.get("depositDate", None)
|
|
if _date is None:
|
|
_date = row.get("accessionDate", None)
|
|
if _date is not None:
|
|
year = _date.year if isinstance(_date, datetime) else int(str(_date)[:4])
|
|
else:
|
|
year = None
|
|
|
|
if year is not None and year >= 2014 and country is None:
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
def is_valid_regex(value, validation_conf):
|
|
if value is None:
|
|
return True
|
|
value = str(value)
|
|
regexp = validation_conf[MATCH]
|
|
multiple = validation_conf.get(MULTIPLE, False)
|
|
separator = validation_conf.get(SEPARATOR, None)
|
|
|
|
values = [v.strip() for v in value.split(
|
|
separator)] if multiple else [value]
|
|
|
|
for value in values:
|
|
matches_regexp = re.fullmatch(regexp, value)
|
|
if not matches_regexp:
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_valid_crossrefs(value, validation_conf):
|
|
crossref_name = validation_conf[CROSSREF_NAME]
|
|
crossrefs = validation_conf['crossrefs_pointer']
|
|
choices = crossrefs[crossref_name]
|
|
if value is None or not choices:
|
|
return True
|
|
value = str(value)
|
|
|
|
multiple = validation_conf.get(MULTIPLE, False)
|
|
separator = validation_conf.get(SEPARATOR, None)
|
|
if value is None:
|
|
return True
|
|
if multiple:
|
|
values = [v.strip() for v in value.split(separator)]
|
|
else:
|
|
values = [value.strip()]
|
|
|
|
return all(value in choices for value in values)
|
|
|
|
|
|
def is_valid_choices(value, validation_conf):
|
|
if value is None:
|
|
return True
|
|
choices = validation_conf[VALUES]
|
|
multiple = validation_conf.get(MULTIPLE, False)
|
|
separator = validation_conf.get(SEPARATOR, None)
|
|
|
|
if multiple:
|
|
values = [v.strip() for v in str(value).split(separator)]
|
|
else:
|
|
values = [str(value).strip()]
|
|
sorted_values = sorted(values)
|
|
if sorted_values != values:
|
|
return False
|
|
return all(value in choices for value in values)
|
|
|
|
|
|
def is_valid_date(value, validation_conf):
|
|
if value is None:
|
|
return True
|
|
if isinstance(value, datetime):
|
|
year = value.year
|
|
month = value.month
|
|
day = value.day
|
|
elif isinstance(value, int):
|
|
year = value
|
|
month = None
|
|
day = None
|
|
elif isinstance(value, str):
|
|
value = value.replace('-', '')
|
|
value = value.replace('/', '')
|
|
month = None
|
|
day = None
|
|
try:
|
|
year = int(value[: 4])
|
|
if len(value) >= 6:
|
|
month = int(value[4: 6])
|
|
if len(value) >= 8:
|
|
day = int(value[6: 8])
|
|
|
|
except (IndexError, TypeError, ValueError):
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
if year < 1700 or year > datetime.now().year:
|
|
return False
|
|
if month is not None:
|
|
if month < 1 or month > 13:
|
|
return False
|
|
if day is not None and (day < 1 or day > monthrange(year, month)[1]):
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_valid_dominio(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
items = [i.strip() for i in value.split(";")]
|
|
if len(items) >1:
|
|
for i in range(0, len(items),2):
|
|
nameSite = str(items[i])
|
|
urlSite = str(items[i+1])
|
|
dominio = urlSite.split(".")[-2]
|
|
if nameSite.lower() != dominio:
|
|
return False
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def is_valid_title(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
items = [i.strip() for i in value.split(";")]
|
|
if len(items) >1:
|
|
for i in range(0, len(items),2):
|
|
nameSite = (items[i])
|
|
urlSite = str(items[i+1])
|
|
regex = r'^(http|https):\/\/[a-z0-9\-\.]+\.[a-z]{2,}([/a-z0-9\-\.]*)*$'
|
|
if re.match(regex, nameSite) or isinstance(nameSite, int) or nameSite == '':
|
|
return False
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def is_valid_url_title(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
items = [i.strip() for i in value.split(";")]
|
|
if len(items) ==1:
|
|
urlSite = str(items[0])
|
|
response = requests.head(urlSite)
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
else:
|
|
items = [i.strip() for i in value.split(";")]
|
|
for i in range(0, len(items),2):
|
|
nameSite = (items[i])
|
|
urlSite = str(items[i+1])
|
|
response = requests.head(urlSite)
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def is_valid_url_dominio(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
items = [i.strip() for i in value.split(";")]
|
|
for i in range(0, len(items),2):
|
|
nameSite = str(items[i])
|
|
urlSite = str(items[i+1])
|
|
response = requests.head(urlSite)
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def is_valid_just_url(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
items = [i.strip() for i in value.split(";")]
|
|
for i in items:
|
|
nameSite = str(items[0])
|
|
response = requests.head(i)
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def is_valid_history(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
items = [i.strip() for i in value.split("<")]
|
|
for i in items:
|
|
regex1 = r'^[a-zA-Z &,;.''-]+, ((19|20)\d{2})'
|
|
regex2 = r'^[a-zA-Z &,;.''-]+, [a-zA-Z &,;.''-] (19|20)\d{2}\s\([a-zA-Z &,;.''-]+\)'
|
|
regex3 = r'^[a-zA-Z &,;.''-]+\, [a-zA-Z &,;.''-]'
|
|
regex4 = r'^[a-zA-Z &,;.''-]+, (19|20)\d{2}\s\([a-zA-Z .''-,;&]+\)'
|
|
regex5 = r'^[a-zA-Z &,;.''-]+, \([a-zA-Z &,;.''-]+\) (19|20)\d{2}'
|
|
if re.match(regex1, i):
|
|
return True
|
|
elif re.match(regex2, i):
|
|
return True
|
|
elif re.match(regex3, i):
|
|
return True
|
|
elif re.match(regex4, i):
|
|
return True
|
|
elif re.match(regex5, i):
|
|
return True
|
|
else:
|
|
return False
|
|
except:
|
|
return False
|
|
|
|
|
|
def is_valid_coords(value, validation_conf=None):
|
|
if value is None:
|
|
return True
|
|
try:
|
|
|
|
regex1 = r'^-?(90(\.0+)?|[1-8]?\d(\.\d+)?);-?(180(\.0+)?|((1[0-7]\d)|(\d{1,2}))(\.\d+)?)$'
|
|
regex2 = r'^-?(90(\.0+)?|[1-8]?\d(\.\d+)?);-?(180(\.0+)?|((1[0-7]\d)|(\d{1,2}))(\.\d+)?);(\d+\.\d+|\?);(\d+\.\d+|\?)$|^(\d+\.\d+|\?)$|^;$'
|
|
|
|
if not re.match(regex1, value) and not re.match(regex2, value):
|
|
return False
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def is_valid_missing(value, validation_conf=None):
|
|
return value is not None
|
|
|
|
|
|
def is_valid_taxon(value, validation_conf=None):
|
|
multiple = validation_conf.get(MULTIPLE, False)
|
|
separator = validation_conf.get(SEPARATOR, ';')
|
|
|
|
value = value.split(separator) if multiple else [value]
|
|
for taxon in value:
|
|
taxon = taxon.strip()
|
|
if not _is_valid_taxon(taxon):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _is_valid_taxon(value):
|
|
value = value.strip()
|
|
if not value:
|
|
return True
|
|
|
|
items = re.split(r" +", value)
|
|
genus = items[0]
|
|
|
|
if len(items) > 1:
|
|
species = items[1]
|
|
if species in ("sp", "spp", ".sp", "sp."):
|
|
return False
|
|
|
|
if len(items) > 2:
|
|
for index in range(0, len(items[2:]), 2):
|
|
rank = SUBTAXAS.get(items[index + 2], None)
|
|
if rank is None:
|
|
print(value)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def is_valid_unique(value, validation_conf):
|
|
if not value:
|
|
return True
|
|
label = validation_conf['label']
|
|
shown_values = validation_conf['shown_values']
|
|
if label not in shown_values:
|
|
shown_values[label] = {}
|
|
|
|
already_in_file = shown_values[label]
|
|
if value in already_in_file:
|
|
return False
|
|
|
|
# NOTE: what's the use of this?
|
|
# What is the expected format for value and shown_values?
|
|
shown_values[label][value] = None
|
|
|
|
return True
|
|
|
|
def is_valid_file(path):
|
|
try:
|
|
with path.open("rb") as fhand:
|
|
error_log = validate_mirri_excel(fhand)
|
|
if "EXL" in error_log.get_errors():
|
|
return False
|
|
except:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
VALIDATION_FUNCTIONS = {
|
|
MISSING: is_valid_missing,
|
|
REGEXP: is_valid_regex,
|
|
CHOICES: is_valid_choices,
|
|
CROSSREF: is_valid_crossrefs,
|
|
DATE: is_valid_date,
|
|
COORDINATES: is_valid_coords,
|
|
TAXON: is_valid_taxon,
|
|
TITLE: is_valid_title,
|
|
DOMINIO: is_valid_dominio,
|
|
URL_TITLE: is_valid_url_title,
|
|
URL_DOMINIO: is_valid_url_dominio,
|
|
JUST_URL: is_valid_just_url,
|
|
ISO: validate_country_code,
|
|
HISTORY: is_valid_history,
|
|
VERSION: version,
|
|
UNIQUE: is_valid_unique}
|
|
|
|
|
|
def validate_value(value, step_conf):
|
|
kind = step_conf[TYPE]
|
|
try:
|
|
is_value_valid = VALIDATION_FUNCTIONS[kind]
|
|
except KeyError:
|
|
msg = f'This validation type {kind} is not implemented'
|
|
raise NotImplementedError(msg)
|
|
|
|
error_code = step_conf[ERROR_CODE]
|
|
|
|
if not is_value_valid(value, step_conf):
|
|
return error_code
|