"""
Service for extracting clinical entities with NER model
Manages extraction using the Text2NER model from pacovalentino/Text2NER
"""
import torch
import logging
from typing import Dict, Any, Optional, List
from datetime import datetime
import re
logger = logging.getLogger(__name__)
[docs]
class NERService:
"""
Service for extracting clinical entities with NER model pacovalentino/Text2NER
"""
[docs]
def __init__(self):
"""Initialize the NER service"""
self.model_path = "pacovalentino/Text2NER"
self.ner_pipeline = None
self.available = False
self._initialize_model()
def _initialize_model(self):
"""Initialize the NER model"""
try:
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
logger.info(f"Caricamento modello NER: {self.model_path}")
# Prova a caricare con impostazioni più conservative
tokenizer = AutoTokenizer.from_pretrained(
self.model_path,
trust_remote_code=False,
local_files_only=False
)
model = AutoModelForTokenClassification.from_pretrained(
self.model_path,
trust_remote_code=False,
local_files_only=False,
torch_dtype="auto"
)
model.eval()
# Usa CPU se CUDA non è disponibile o causa problemi
device = 0 if torch.cuda.is_available() else -1
self.ner_pipeline = pipeline(
"ner",
model=model,
tokenizer=tokenizer,
aggregation_strategy="simple",
device=device
)
self.available = True
logger.info("Modello NER caricato con successo")
except ImportError as e:
logger.error(f"Dipendenze mancanti per NER: {str(e)}")
self.available = False
except Exception as e:
logger.error(f"Errore durante il caricamento del modello NER: {str(e)}")
logger.info("NER non disponibile - verrà usato fallback")
self.available = False
[docs]
def test_connection(self) -> Dict[str, Any]:
"""
Test the availability of the NER model
:returns: Dictionary with model information
:rtype: Dict[str, Any]
"""
if not self.available:
return {
'success': False,
'error': 'Modello NER non disponibile - utilizzare metodo LLM',
'config': {
'model_path': self.model_path,
'cuda_available': torch.cuda.is_available(),
'model_loaded': False,
'device': 'none'
}
}
try:
# Test semplice con testo di esempio
test_text = "Il paziente Mario Rossi, maschio, 58 anni, presentava SpO₂ 91%."
results = self.ner_pipeline(test_text)
# Determina il device in modo sicuro
device_info = 'unknown'
try:
if hasattr(self.ner_pipeline, 'device'):
device_info = str(self.ner_pipeline.device)
elif hasattr(self.ner_pipeline, 'model') and hasattr(self.ner_pipeline.model, 'device'):
device_info = str(self.ner_pipeline.model.device)
except:
device_info = 'cpu' if not torch.cuda.is_available() else 'auto'
return {
'success': True,
'test_entities_found': len(results),
'config': {
'model_path': self.model_path,
'cuda_available': torch.cuda.is_available(),
'model_loaded': True,
'device': device_info
}
}
except Exception as e:
logger.error(f"Errore test modello NER: {str(e)}")
return {
'success': False,
'error': f"Test fallito: {str(e)}",
'config': {
'model_path': self.model_path,
'cuda_available': torch.cuda.is_available(),
'model_loaded': self.available,
'device': 'error'
}
}
def _split_text_into_sentences(self, text: str) -> List[str]:
"""
Split the text into sentences using appropriate delimiters for the medical context
:param str text: Text to split
:type text: str
:return: List of sentences
:rtype: List[str]
"""
import re
# Pattern per identificare fine frase nel contesto medico
# Considera punti, punti esclamativi, punti interrogativi e due punti seguiti da spazio/newline
sentence_delimiters = r'[.!?:]\s+|[.!?:]$'
# Splitta il testo mantenendo i delimitatori
sentences = re.split(sentence_delimiters, text.strip())
# Filtra frasi vuote o troppo corte
filtered_sentences = []
for sentence in sentences:
cleaned = sentence.strip()
if cleaned and len(cleaned) > 3: # Frasi di almeno 4 caratteri
filtered_sentences.append(cleaned)
logger.debug(f"Frasi estratte: {len(filtered_sentences)} frasi")
return filtered_sentences
def _map_ner_to_clinical_fields_aggregated(self, ner_results: List[Dict], transcript_text: str) -> Dict[str, Any]:
"""
Map the NER entities to standard clinical fields with aggregation by type
Entities of the same type are joined with a comma
:param ner_results: NER model results from all sentences
:type ner_results: List[Dict]
:param transcript_text: Original text for context
:type transcript_text: str
:return: Dictionary with mapped and aggregated clinical fields
:rtype: Dict[str, Any]
"""
from collections import defaultdict
# Raggruppa entità per tipo
entities_by_type = defaultdict(list)
for entity in ner_results:
entity_text = entity['word'].strip()
label = entity['entity_group']
entities_by_type[label].append(entity_text)
logger.debug(f"Entità raggruppate per tipo: {len(entities_by_type)} tipi")
# Inizializza tutti i campi vuoti
clinical_data = {
'first_name': '',
'last_name': '',
'access_mode': '',
'birth_date': '',
'birth_place': '',
'age': '',
'gender': '',
'residence_city': '',
'residence_address': '',
'phone': '',
'skin_state': '',
'consciousness_state': '',
'pupils_state': '',
'respiratory_state': '',
'history': '',
'medications_taken': '',
'symptoms': '',
'heart_rate': '',
'oxygenation': '',
'blood_pressure': '',
'temperature': '',
'blood_glucose': '',
'medical_actions': '',
'assessment': '',
'plan': '',
'triage_code': ''
}
# Mappa le entità aggregate ai campi
for label, entity_texts in entities_by_type.items():
# Rimuovi duplicati mantenendo l'ordine
unique_texts = []
for text in entity_texts:
if text not in unique_texts:
unique_texts.append(text)
# Mapping delle entità NER ai campi clinici con aggregazione
if label == 'NOME_COGNOME':
# Per nome e cognome, prova a separare il primo elemento
if unique_texts:
parts = unique_texts[0].split()
if len(parts) >= 2:
clinical_data['first_name'] = parts[0]
clinical_data['last_name'] = ' '.join(parts[1:])
else:
clinical_data['first_name'] = unique_texts[0]
elif label == 'SESSO':
# Per il sesso, prendi il primo valore
if unique_texts:
clinical_data['gender'] = self._normalize_gender(unique_texts[0])
elif label == 'DATA_NASCITA':
# Per la data di nascita, prendi il primo valore
if unique_texts:
clinical_data['birth_date'] = self._normalize_date(unique_texts[0])
elif label == 'LUOGO_NASCITA':
# Unisci tutti i luoghi di nascita
clinical_data['birth_place'] = ', '.join(unique_texts)
elif label == 'COMUNE_RESIDENZA':
# Unisci tutte le città di residenza
clinical_data['residence_city'] = ', '.join(unique_texts)
elif label == 'VIA_RESIDENZA':
# Unisci tutti gli indirizzi
clinical_data['residence_address'] = ', '.join(unique_texts)
elif label in ['TELEFONO', 'NUMERO_TELEFONO']:
# Unisci tutti i telefoni (supporta entrambe le etichette)
clinical_data['phone'] = ', '.join(unique_texts)
elif label == 'FC_BPM':
# Per frequenza cardiaca, mantieni l'unità di misura
if unique_texts:
clinical_data['heart_rate'] = self._extract_with_units(unique_texts, 'bpm')
elif label == 'SpO2':
# Per saturazione, mantieni l'unità di misura
if unique_texts:
clinical_data['oxygenation'] = self._extract_with_units(unique_texts, '%')
elif label == 'PA_MMHG':
# Per pressione, mantieni l'unità di misura
if unique_texts:
clinical_data['blood_pressure'] = self._extract_with_units(unique_texts, 'mmHg')
elif label == 'TEMPERATURA':
# Per temperatura, mantieni l'unità di misura
if unique_texts:
clinical_data['temperature'] = self._extract_with_units(unique_texts, '°C')
elif label == 'GLICEMIA':
# Per glicemia, mantieni l'unità di misura
if unique_texts:
clinical_data['blood_glucose'] = self._extract_with_units(unique_texts, 'mg/dl')
elif label == 'CUTE':
# Unisci tutti gli stati della cute
clinical_data['skin_state'] = ', '.join(unique_texts)
elif label == 'COSCIENZA':
# Unisci tutti gli stati di coscienza
clinical_data['consciousness_state'] = ', '.join(unique_texts)
elif label in ['PUPILLE_TIPO_DX', 'PUPILLE_TIPO_SX', 'PUPILLE_REATTIVITA']:
# Unisci tutte le informazioni sulle pupille
if clinical_data['pupils_state']:
clinical_data['pupils_state'] += ', ' + ', '.join(unique_texts)
else:
clinical_data['pupils_state'] = ', '.join(unique_texts)
elif label == 'RESPIRO':
# Unisci tutti gli stati respiratori
clinical_data['respiratory_state'] = ', '.join(unique_texts)
elif label == 'MEDICINA':
# Unisci tutti i farmaci
clinical_data['medications_taken'] = ', '.join(unique_texts)
elif label == 'CONDIZIONE_RIFERITA':
# Unisci tutti i sintomi
clinical_data['symptoms'] = ', '.join(unique_texts)
elif label in ['PROVVEDIMENTI_ALTRO', 'PROVVEDIMENTI_CIRCOLO',
'PROVVEDIMENTI_IMMOBILIZZAZIONE', 'PROVVEDIMENTI_RESPIRO']:
# Unisci tutte le azioni mediche
if clinical_data['medical_actions']:
clinical_data['medical_actions'] += ', ' + ', '.join(unique_texts)
else:
clinical_data['medical_actions'] = ', '.join(unique_texts)
elif label == 'CODICE_USCITA':
# Per codice triage, prendi il primo valore valido
if unique_texts:
triage_mapping = {
'rosso': 'rosso',
'giallo': 'giallo',
'verde': 'verde',
'bianco': 'bianco',
'nero': 'nero'
}
normalized_code = unique_texts[0].lower()
clinical_data['triage_code'] = triage_mapping.get(normalized_code, '')
# Mapping addizionali per etichette che potrebbero essere varianti
elif label in ['ETA', 'AGE', 'ANNI']:
# Età del paziente
clinical_data['age'] = ', '.join(unique_texts)
elif label in ['ANAMNESI', 'STORIA_CLINICA', 'HISTORY']:
# Storia clinica/anamnesi
clinical_data['history'] = ', '.join(unique_texts)
elif label in ['VALUTAZIONE', 'ASSESSMENT', 'DIAGNOSI']:
# Valutazione clinica
clinical_data['assessment'] = ', '.join(unique_texts)
elif label in ['PIANO', 'PLAN', 'TERAPIA', 'TRATTAMENTO']:
# Piano terapeutico
clinical_data['plan'] = ', '.join(unique_texts)
elif label in ['MODALITA_ACCESSO', 'ACCESS_MODE', 'ARRIVO']:
# Modalità di accesso
clinical_data['access_mode'] = ', '.join(unique_texts)
# Caso di default per etichette non riconosciute
else:
logger.warning(f"Etichetta NER non mappata: '{label}' con valore: {unique_texts}")
return clinical_data
def _extract_with_units(self, entity_texts: List[str], default_unit: str) -> str:
"""
Extract numeric values while keeping the unit of measurement if present.
Correctly handles spaces in values like "120 / 70"
:param entity_texts: List of entity texts to analyze
:type entity_texts: List[str]
:param default_unit: Default unit to use if none found
:type default_unit: str
:return: Extracted value with unit
:rtype: str
"""
import re
for text in entity_texts:
text_clean = text.strip()
# Pattern speciali per temperatura con spazi e punti anomali
if default_unit.lower() == '°c':
temp_patterns = [
r'(\d+)\.\s*(\d+)\s*(gradi|°c?|celsius)', # "36. 8 gradi"
r'(\d+),\s*(\d+)\s*(gradi|°c?|celsius)', # "36, 8 gradi"
r'(\d+(?:[.,]\d+)?)\s*(gradi|°c?|celsius)', # "36.8 gradi" normale
r'(\d+(?:[.,]\d+)?)', # solo numero
]
for pattern in temp_patterns:
match = re.search(pattern, text_clean.lower())
if match:
if len(match.groups()) >= 2 and match.group(2) and match.group(2).isdigit():
# Caso "36. 8" o "36, 8" - ricostruisci il numero
temp_value = f"{match.group(1)}.{match.group(2)}"
else:
# Caso normale "36.8"
temp_value = match.group(1).replace(',', '.')
try:
# Valida che sia una temperatura ragionevole
temp_float = float(temp_value)
if 30 <= temp_float <= 45: # Range temperatura corporea ragionevole
return f"{temp_value} {default_unit}"
except ValueError:
continue
# Pattern speciali per pressione arteriosa con spazi
if default_unit.lower() == 'mmhg':
bp_patterns = [
r'(\d+)\s*/\s*(\d+)\s*([a-zA-Z]+)?', # "120 / 70" o "120/70 mmHg"
r'(\d+)\s*su\s*(\d+)\s*([a-zA-Z]+)?', # "120 su 70"
r'(\d+)\s*-\s*(\d+)\s*([a-zA-Z]+)?', # "120 - 70"
]
for pattern in bp_patterns:
match = re.search(pattern, text_clean)
if match:
systolic = match.group(1)
diastolic = match.group(2)
unit = match.group(3) if match.group(3) else default_unit
# Normalizza unità pressione
if unit.lower() in ['mmhg', 'mm', 'hg']:
unit = 'mmHg'
else:
unit = default_unit
return f"{systolic}/{diastolic} {unit}"
# Pattern per valori con spazi "95 %" o "120 bpm"
patterns = [
r'(\d+(?:[.,]\d+)?)\s*([a-zA-Z/%°]+)', # numero + spazio + unità
r'(\d+(?:[.,]\d+)?)\s*([%°])', # numero + spazio + simbolo
r'(\d+(?:[.,]\d+)?)\s+([a-zA-Z]+)', # numero + spazi + unità
r'(\d+(?:[.,]\d+)?)', # solo numero
]
for pattern in patterns:
match = re.search(pattern, text_clean)
if match:
value = match.group(1).replace(',', '.')
unit = match.group(2) if len(match.groups()) > 1 and match.group(2) else default_unit
# Normalizza alcune unità comuni
unit_mapping = {
'bpm': 'bpm',
'battiti': 'bpm',
'beat': 'bpm',
'beats': 'bpm',
'%': '%',
'percento': '%',
'percent': '%',
'°c': '°C',
'°': '°C',
'gradi': '°C',
'celsius': '°C',
'c': '°C',
'mmhg': 'mmHg',
'mm': 'mmHg',
'hg': 'mmHg',
'mg/dl': 'mg/dl',
'mgdl': 'mg/dl',
'mg': 'mg/dl'
}
normalized_unit = unit_mapping.get(unit.lower(), unit)
return f"{value} {normalized_unit}"
return ', '.join(entity_texts) # Fallback: unisci tutto
def _map_ner_to_clinical_fields(self, ner_results: List[Dict], transcript_text: str) -> Dict[str, Any]:
"""
Map the NER entities to standard clinical fields
:param ner_results: Risultati del modello NER
:type ner_results: List[Dict]
:param transcript_text: Testo originale per contesto
:type transcript_text: str
:return: Dizionario con campi clinici mappati
:rtype: Dict[str, Any]
"""
# Inizializza tutti i campi vuoti
clinical_data = {
'first_name': '',
'last_name': '',
'access_mode': '',
'birth_date': '',
'birth_place': '',
'age': '',
'gender': '',
'residence_city': '',
'residence_address': '',
'phone': '',
'skin_state': '',
'consciousness_state': '',
'pupils_state': '',
'respiratory_state': '',
'history': '',
'medications_taken': '',
'symptoms': '',
'heart_rate': '',
'oxygenation': '',
'blood_pressure': '',
'temperature': '',
'blood_glucose': '',
'medical_actions': '',
'assessment': '',
'plan': '',
'triage_code': ''
}
# Mappa le entità NER ai campi
for entity in ner_results:
entity_text = entity['word'].strip()
label = entity['entity_group']
# Mapping delle entità NER ai campi clinici
if label == 'NOME_COGNOME':
# Prova a separare nome e cognome
parts = entity_text.split()
if len(parts) >= 2:
clinical_data['first_name'] = parts[0]
clinical_data['last_name'] = ' '.join(parts[1:])
else:
clinical_data['first_name'] = entity_text
elif label == 'SESSO':
clinical_data['gender'] = self._normalize_gender(entity_text)
elif label == 'DATA_NASCITA':
clinical_data['birth_date'] = self._normalize_date(entity_text)
elif label == 'LUOGO_NASCITA':
clinical_data['birth_place'] = entity_text
elif label == 'COMUNE_RESIDENZA':
clinical_data['residence_city'] = entity_text
elif label == 'VIA_RESIDENZA':
if clinical_data['residence_address']:
clinical_data['residence_address'] += f", {entity_text}"
else:
clinical_data['residence_address'] = entity_text
elif label == 'NUMERO_RESIDENZA':
if clinical_data['residence_address']:
clinical_data['residence_address'] += f" {entity_text}"
else:
clinical_data['residence_address'] = entity_text
elif label == 'NUMERO_TELEFONO':
clinical_data['phone'] = entity_text
elif label == 'FC_BPM':
# Usa la normalizzazione robusta per frequenza cardiaca
hr_value, _ = self._extract_numeric_with_unit(
entity_text,
['bpm', 'battiti', 'beat'],
(30, 250)
)
if hr_value:
clinical_data['heart_rate'] = str(int(hr_value))
else:
# Fallback con estrazione semplice
match = re.search(r'(\d{2,3})', entity_text)
if match:
clinical_data['heart_rate'] = match.group(1)
elif label == 'SpO2':
# Usa la normalizzazione robusta per saturazione
sat_value, _ = self._extract_numeric_with_unit(
entity_text,
['%', 'percento', 'spo2'],
(50, 100)
)
if sat_value:
clinical_data['oxygenation'] = str(int(sat_value))
else:
# Fallback
match = re.search(r'(\d{1,3})', entity_text)
if match and 50 <= int(match.group(1)) <= 100:
clinical_data['oxygenation'] = match.group(1)
elif label == 'PA_MMHG':
# Normalizzazione robusta per pressione arteriosa
bp_text = entity_text.lower()
bp_patterns = [
r'(\d{2,3})[\/\-](\d{2,3})',
r'(\d{2,3})\s*su\s*(\d{2,3})',
r'sistolica[:\s]*(\d{2,3}).*diastolica[:\s]*(\d{2,3})'
]
for pattern in bp_patterns:
match = re.search(pattern, bp_text)
if match:
try:
systolic, diastolic = int(match.group(1)), int(match.group(2))
if 50 <= systolic <= 250 and 30 <= diastolic <= 150 and systolic > diastolic:
clinical_data['blood_pressure'] = f"{systolic}/{diastolic}"
break
except:
continue
elif label == 'TEMPERATURA':
# Normalizzazione robusta per temperatura
temp_value, _ = self._extract_numeric_with_unit(
entity_text,
['°c', '°', 'gradi', 'celsius'],
(30, 45)
)
if temp_value:
clinical_data['temperature'] = round(temp_value, 1)
else:
# Fallback
match = re.search(r'(\d{1,2}[.,]\d{1,2})', entity_text.replace(',', '.'))
if match:
try:
temp = float(match.group(1))
if 30 <= temp <= 45:
clinical_data['temperature'] = round(temp, 1)
except:
pass
elif label == 'GLICEMIA':
# Normalizzazione robusta per glicemia
glucose_value, unit = self._extract_numeric_with_unit(
entity_text,
['mg/dl', 'mg', 'mmol/l', 'glicemia'],
(30, 600)
)
if glucose_value:
# Conversione da mmol/l a mg/dl se necessario
if unit and 'mmol' in unit:
glucose_value = glucose_value * 18.0
clinical_data['blood_glucose'] = str(int(glucose_value))
else:
# Fallback
match = re.search(r'(\d{2,3})', entity_text)
if match and 30 <= int(match.group(1)) <= 600:
clinical_data['blood_glucose'] = match.group(1)
elif label == 'CUTE':
clinical_data['skin_state'] = entity_text
elif label == 'COSCIENZA':
clinical_data['consciousness_state'] = entity_text
elif label in ['PUPILLE_TIPO_DX', 'PUPILLE_TIPO_SX', 'PUPILLE_REATTIVITA']:
if clinical_data['pupils_state']:
clinical_data['pupils_state'] += f", {entity_text}"
else:
clinical_data['pupils_state'] = entity_text
elif label == 'RESPIRO':
clinical_data['respiratory_state'] = entity_text
elif label == 'MEDICINA':
if clinical_data['medications_taken']:
clinical_data['medications_taken'] += f", {entity_text}"
else:
clinical_data['medications_taken'] = entity_text
elif label == 'CONDIZIONE_RIFERITA':
if clinical_data['symptoms']:
clinical_data['symptoms'] += f", {entity_text}"
else:
clinical_data['symptoms'] = entity_text
elif label in ['PROVVEDIMENTI_ALTRO', 'PROVVEDIMENTI_CIRCOLO',
'PROVVEDIMENTI_IMMOBILIZZAZIONE', 'PROVVEDIMENTI_RESPIRO']:
if clinical_data['medical_actions']:
clinical_data['medical_actions'] += f", {entity_text}"
else:
clinical_data['medical_actions'] = entity_text
elif label == 'CODICE_USCITA':
# Mappa i codici uscita ai codici triage
triage_mapping = {
'rosso': 'rosso',
'giallo': 'giallo',
'verde': 'verde',
'bianco': 'bianco',
'nero': 'nero'
}
normalized_code = entity_text.lower()
clinical_data['triage_code'] = triage_mapping.get(normalized_code, '')
return clinical_data
def _normalize_gender(self, text: str) -> str:
"""Normalize the gender field with robust mapping
:param str text: Input text
:type text: str
:return: Normalized gender
:rtype: str
"""
text_lower = text.lower().strip()
# Mappatura completa per genere maschile
male_variants = [
'm', 'maschio', 'maschile', 'male', 'uomo', 'man', 'boy', 'ragazzo',
'masculine', 'masculino', 'homme', 'hombre'
]
# Mappatura completa per genere femminile
female_variants = [
'f', 'femmina', 'femminile', 'female', 'donna', 'woman', 'girl', 'ragazza',
'feminine', 'feminino', 'femme', 'mujer'
]
# Controlla match esatto prima (più specifico)
if text_lower in male_variants:
return 'M'
elif text_lower in female_variants:
return 'F'
# Poi controlla se contiene le varianti (solo per parole lunghe > 1 carattere)
elif any(variant in text_lower for variant in male_variants if len(variant) > 1):
return 'M'
elif any(variant in text_lower for variant in female_variants if len(variant) > 1):
return 'F'
else:
return 'O' # Altro/Non specificato
def _normalize_date(self, text: str) -> str:
"""Normalize a date into the YYYY-MM-DD format with support for Italian formats
:param str text: Input date text
:type text: str
:return: Normalized date or original text if parsing fails
:rtype: str
"""
import calendar
# Mappatura dei mesi italiani
italian_months = {
'gennaio': 1, 'gen': 1,
'febbraio': 2, 'feb': 2,
'marzo': 3, 'mar': 3,
'aprile': 4, 'apr': 4,
'maggio': 5, 'mag': 5,
'giugno': 6, 'giu': 6,
'luglio': 7, 'lug': 7,
'agosto': 8, 'ago': 8,
'settembre': 9, 'set': 9, 'sett': 9,
'ottobre': 10, 'ott': 10,
'novembre': 11, 'nov': 11,
'dicembre': 12, 'dic': 12
}
text = text.strip().lower()
# Pattern per date numeriche standard
date_patterns = [
r'(\d{1,2})[\/\-\.](\d{1,2})[\/\-\.](\d{4})', # dd/mm/yyyy
r'(\d{4})[\/\-\.](\d{1,2})[\/\-\.](\d{1,2})', # yyyy/mm/dd
r'(\d{1,2})[\/\-\.](\d{1,2})[\/\-\.](\d{2})', # dd/mm/yy
]
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
part1, part2, part3 = match.groups()
# Determina il formato in base alla lunghezza del terzo gruppo
if len(part3) == 4: # Anno completo
if int(part3) > 1900: # yyyy/mm/dd
year, month, day = part3, part1, part2
else: # dd/mm/yyyy
day, month, year = part1, part2, part3
else: # dd/mm/yy
day, month, year = part1, part2, f"19{part3}" if int(part3) > 30 else f"20{part3}"
try:
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
except:
continue
# Pattern per date con mesi in italiano: "23 febbraio 1990"
month_pattern = r'(\d{1,2})\s+(\w+)\s+(\d{4})'
match = re.search(month_pattern, text)
if match:
day, month_name, year = match.groups()
# Cerca il mese nella mappatura italiana
month_num = None
for italian_month, num in italian_months.items():
if italian_month in month_name.lower():
month_num = num
break
if month_num:
try:
return f"{year}-{str(month_num).zfill(2)}-{day.zfill(2)}"
except:
pass
# Pattern per date con mesi abbreviati: "23 feb 1990", "feb 23, 1990"
abbrev_patterns = [
r'(\d{1,2})\s+(\w{3,4})\s+(\d{4})', # 23 feb 1990
r'(\w{3,4})\s+(\d{1,2}),?\s+(\d{4})', # feb 23, 1990
]
for i, pattern in enumerate(abbrev_patterns):
match = re.search(pattern, text)
if match:
if i == 0: # dd mmm yyyy
day, month_abbr, year = match.groups()
else: # mmm dd, yyyy
month_abbr, day, year = match.groups()
# Cerca il mese nella mappatura italiana
month_num = None
for italian_month, num in italian_months.items():
if month_abbr.lower() in italian_month or italian_month.startswith(month_abbr.lower()):
month_num = num
break
if month_num:
try:
return f"{year}-{str(month_num).zfill(2)}-{day.zfill(2)}"
except:
continue
# Se non riesce a parsare, restituisce il testo originale
return text
def _normalize_fields_with_units(self, data: Dict[str, Any], usage_mode: str = "") -> Dict[str, Any]:
"""
Normalize the extracted fields while preserving units of measurement where appropriate
:param dict data: Extracted data to normalize
:type data: Dict[str, Any]
:param str usage_mode: Usage mode
:type usage_mode: str
:return: Normalized data with preserved units of measurement
:rtype: Dict[str, Any]
"""
normalized = data.copy()
null_values = {"unknown", "na", "n/a", "null", "none", "sconosciuto", ""}
# Rimuove valori considerati nulli
for key, value in normalized.items():
if isinstance(value, str) and value.strip().lower() in null_values:
normalized[key] = ""
# Per i parametri vitali, mantieni le unità se già presenti o aggiungile se mancano
vital_signs_with_units = {
'heart_rate': 'bpm',
'oxygenation': '%',
'temperature': '°C',
'blood_glucose': 'mg/dl'
}
for field, default_unit in vital_signs_with_units.items():
if data.get(field):
value_str = str(data[field]).strip()
if value_str:
# Se il valore ha già un'unità, mantienilo
if any(unit in value_str.lower() for unit in ['bpm', '%', '°c', '°', 'mg/dl', 'mmhg']):
normalized[field] = value_str
else:
# Se è solo un numero, aggiungi l'unità di default
import re
number_match = re.search(r'(\d+(?:[.,]\d+)?)', value_str)
if number_match:
number = number_match.group(1)
normalized[field] = f"{number} {default_unit}"
else:
normalized[field] = value_str
# Per la pressione arteriosa, gestisci il formato speciale
if data.get("blood_pressure"):
bp_value = str(data["blood_pressure"]).strip()
if bp_value:
# Se contiene già mmHg, mantienilo
if 'mmhg' not in bp_value.lower():
# Se è un formato sistolica/diastolica, aggiungi mmHg
import re
if re.search(r'\d+[/\-]\d+', bp_value):
normalized["blood_pressure"] = f"{bp_value} mmHg"
else:
normalized["blood_pressure"] = bp_value
else:
normalized["blood_pressure"] = bp_value
# Se modalità Checkup, mantieni solo campi specifici
if usage_mode == "Checkup":
fields_to_keep = {
"first_name", "last_name", "medications_taken",
"heart_rate", "oxygenation", "blood_pressure",
"temperature", "blood_glucose", "medical_actions",
"assessment", "plan", "symptoms"
}
normalized = {k: v for k, v in normalized.items() if k in fields_to_keep}
return normalized
def _normalize_fields(self, data: Dict[str, Any], usage_mode: str = "") -> Dict[str, Any]:
"""
Normalize the extracted fields while preserving units of measurement where appropriate
:param dict data: Extracted data to normalize
:type data: Dict[str, Any]
:param str usage_mode: Usage mode
:type usage_mode: str
:return: Normalized data with preserved units of measurement
:rtype: Dict[str, Any]
"""
normalized = data.copy()
null_values = {"unknown", "na", "n/a", "null", "none", "sconosciuto", ""}
# Rimuove valori considerati nulli
for key, value in normalized.items():
if isinstance(value, str) and value.strip().lower() in null_values:
normalized[key] = ""
# Normalizzazione frequenza cardiaca - supporta "bpm", "battiti", etc.
if data.get("heart_rate"):
hr_text = str(data["heart_rate"]).lower()
# Pattern più robusti per FC
hr_patterns = [
r"(\d{2,3})\s*(?:bpm|battiti|beat|pulsazioni|bat)",
r"fc[:\s]*(\d{2,3})",
r"frequenza[:\s]+cardiaca[:\s]*(\d{2,3})",
r"(\d{2,3})\s*(?:al|per)\s*minuto",
r"\b(\d{2,3})\b" # Fallback per solo numero
]
hr_value = None
for pattern in hr_patterns:
match = re.search(pattern, hr_text)
if match:
hr_value = int(match.group(1))
if 30 <= hr_value <= 250: # Range fisiologico
break
normalized["heart_rate"] = str(hr_value) if hr_value else ""
# Normalizzazione saturazione ossigeno - supporta "%" e varianti
if data.get("oxygenation"):
sat_text = str(data["oxygenation"]).lower()
# Pattern per saturazione
sat_patterns = [
r"(\d{1,3})\s*%",
r"spo2[:\s]*(\d{1,3})",
r"saturazione[:\s]*(\d{1,3})",
r"ossigeno[:\s]*(\d{1,3})",
r"\b(\d{2,3})\b" # Fallback per solo numero
]
sat_value = None
for pattern in sat_patterns:
match = re.search(pattern, sat_text)
if match:
sat_value = int(match.group(1))
if 50 <= sat_value <= 100: # Range fisiologico
break
normalized["oxygenation"] = str(sat_value) if sat_value else ""
# Normalizzazione temperatura - supporta "°C", "gradi", etc.
if data.get("temperature"):
temp_text = str(data["temperature"]).replace(",", ".").lower()
# Pattern per temperatura
temp_patterns = [
r"(\d{1,2}[.,]\d{1,2})\s*(?:°c|gradi|celsius)",
r"temperatura[:\s]*(\d{1,2}[.,]\d{1,2})",
r"(\d{1,2}[.,]\d{1,2})\s*°",
r"(\d{1,2}[.,]\d{1,2})\s*gradi",
r"(\d{1,2}[.,]\d{1,2})" # Fallback
]
temp_value = None
for pattern in temp_patterns:
match = re.search(pattern, temp_text)
if match:
try:
temp_value = float(match.group(1).replace(",", "."))
if 30 <= temp_value <= 45: # Range fisiologico
break
except:
continue
normalized["temperature"] = temp_value if temp_value else ""
# Normalizzazione glicemia - supporta "mg/dl", "mmol/l", etc.
if data.get("blood_glucose"):
glucose_text = str(data["blood_glucose"]).lower()
# Pattern per glicemia
glucose_patterns = [
r"(\d{2,3})\s*mg/dl",
r"(\d{2,3})\s*mg",
r"glicemia[:\s]*(\d{2,3})",
r"glucosio[:\s]*(\d{2,3})",
r"(\d{1,2}[.,]\d{1,2})\s*mmol/l", # Conversione da mmol/l
r"\b(\d{2,3})\b" # Fallback
]
glucose_value = None
for i, pattern in enumerate(glucose_patterns):
match = re.search(pattern, glucose_text)
if match:
try:
value = float(match.group(1).replace(",", "."))
# Se è in mmol/l, converti in mg/dl
if i == 4: # Pattern mmol/l
value = value * 18.0 # Conversione mmol/l -> mg/dl
if 30 <= value <= 600: # Range fisiologico esteso
glucose_value = int(value)
break
except:
continue
normalized["blood_glucose"] = str(glucose_value) if glucose_value else ""
# Normalizzazione pressione arteriosa - supporta vari formati
if data.get("blood_pressure"):
bp_text = str(data["blood_pressure"]).lower()
# Pattern per pressione arteriosa
bp_patterns = [
r"(\d{2,3})[\/\-](\d{2,3})\s*mmhg",
r"(\d{2,3})[\/\-](\d{2,3})",
r"pa[:\s]*(\d{2,3})[\/\-](\d{2,3})",
r"pressione[:\s]*(\d{2,3})[\/\-](\d{2,3})",
r"sistolica[:\s]*(\d{2,3}).*diastolica[:\s]*(\d{2,3})",
r"(\d{2,3})\s*su\s*(\d{2,3})",
r"(\d{2,3})\s*mmhg.*(\d{2,3})\s*mmhg"
]
bp_value = None
for pattern in bp_patterns:
match = re.search(pattern, bp_text)
if match:
try:
systolic = int(match.group(1))
diastolic = int(match.group(2))
# Verifica range fisiologici
if 50 <= systolic <= 250 and 30 <= diastolic <= 150 and systolic > diastolic:
bp_value = f"{systolic}/{diastolic}"
break
except:
continue
normalized["blood_pressure"] = bp_value if bp_value else ""
# Normalizzazione età - estrae numero dagli anni
if data.get("age"):
age_text = str(data["age"]).lower()
age_patterns = [
r"(\d{1,3})\s*anni",
r"(\d{1,3})\s*years",
r"età[:\s]*(\d{1,3})",
r"\b(\d{1,3})\b"
]
age_value = None
for pattern in age_patterns:
match = re.search(pattern, age_text)
if match:
age_val = int(match.group(1))
if 0 <= age_val <= 120: # Range ragionevole
age_value = age_val
break
normalized["age"] = age_value if age_value else ""
# Se modalità Checkup, mantieni solo campi specifici
if usage_mode == "Checkup":
fields_to_keep = {
"first_name", "last_name", "medications_taken",
"heart_rate", "oxygenation", "blood_pressure",
"temperature", "blood_glucose", "medical_actions",
"assessment", "plan", "symptoms"
}
for key in list(normalized.keys()):
if key not in fields_to_keep:
normalized[key] = ""
return normalized
def _extract_numeric_with_unit(self, text: str, expected_units: list, value_range: tuple = None) -> tuple:
"""
Extract a numeric value with unit of measurement from a text
:param str text: Text to analyze
:type text: str
:param list expected_units: List of expected units (e.g. ['bpm', 'battiti'])
:type expected_units: list
:param tuple value_range: Tuple (min, max) for range validation
:type value_range: tuple
:return: Tuple (value, unit) or (None, None) if not found
:rtype: tuple
"""
text_lower = text.lower().strip()
for unit in expected_units:
# Pattern per trovare numero + unità
patterns = [
rf"(\d+(?:[.,]\d+)?)\s*{re.escape(unit)}",
rf"{re.escape(unit)}[:\s]*(\d+(?:[.,]\d+)?)",
rf"(\d+(?:[.,]\d+)?)\s*{re.escape(unit[:3])}" # Abbreviazione
]
for pattern in patterns:
match = re.search(pattern, text_lower)
if match:
try:
value = float(match.group(1).replace(",", "."))
# Verifica range se specificato
if value_range and not (value_range[0] <= value <= value_range[1]):
continue
return value, unit
except ValueError:
continue
return None, None
def _parse_vital_signs_text(self, text: str) -> Dict[str, Any]:
"""
Analyze a text to extract all present vital signs
:param str text: Text containing vital parameters
:type text: str
:return: Dictionary with found parameters
:rtype: Dict[str, Any]
"""
vitals = {}
text_lower = text.lower()
# Pattern per riconoscere gruppi di parametri vitali
vital_patterns = {
'heart_rate': {
'patterns': [r'fc[:\s]*(\d{2,3})', r'(\d{2,3})\s*bpm', r'battiti[:\s]*(\d{2,3})'],
'range': (30, 250)
},
'blood_pressure': {
'patterns': [r'pa[:\s]*(\d{2,3})[\/\-](\d{2,3})', r'(\d{2,3})[\/\-](\d{2,3})\s*mmhg'],
'validator': lambda m: 50 <= int(m[0]) <= 250 and 30 <= int(m[1]) <= 150
},
'oxygenation': {
'patterns': [r'spo2[:\s]*(\d{1,3})', r'(\d{1,3})\s*%', r'saturazione[:\s]*(\d{1,3})'],
'range': (50, 100)
},
'temperature': {
'patterns': [r'(\d{1,2}[.,]\d{1,2})\s*°c?', r'temperatura[:\s]*(\d{1,2}[.,]\d{1,2})'],
'range': (30, 45)
}
}
for vital_name, config in vital_patterns.items():
for pattern in config['patterns']:
match = re.search(pattern, text_lower)
if match:
try:
if vital_name == 'blood_pressure':
if len(match.groups()) == 2 and config['validator'](match.groups()):
vitals[vital_name] = f"{match.group(1)}/{match.group(2)}"
else:
value = float(match.group(1).replace(",", "."))
if 'range' in config and config['range'][0] <= value <= config['range'][1]:
vitals[vital_name] = int(value) if value.is_integer() else value
break
except (ValueError, AttributeError):
continue
return vitals
def _validate_fields(self, data: Dict[str, Any], original_text: str) -> List[str]:
"""
Validate extracted fields against the original text
:param dict data: Extracted data to validate
:type data: Dict[str, Any]
:param str original_text: Original text for context
:type original_text: str
:return: List of validation error messages
:rtype: List[str]
"""
error_fields = []
original_text_lower = original_text.lower()
# Validazione nome
if data.get("first_name") and str(data["first_name"]).strip():
name_value = str(data["first_name"]).strip()
if len(name_value) < 2:
error_fields.append("first_name: nome troppo corto")
# Validazione cognome
if data.get("last_name") and str(data["last_name"]).strip():
surname_value = str(data["last_name"]).strip()
if len(surname_value) < 2:
error_fields.append("last_name: cognome troppo corto")
# Validazione temperatura
if data.get("temperature") and str(data["temperature"]).strip():
try:
temp_value = data["temperature"].split("°C")[0]
temp_value = float(temp_value)
if temp_value < 30 or temp_value > 45:
error_fields.append("temperature: valore fuori range normale (30-45°C)")
except:
error_fields.append("temperature: formato non valido")
return list(set(error_fields))
def _fallback_response(self, warning: Optional[str] = None) -> Dict[str, Any]:
"""Response fallback when the NER model is not available
:param str warning: Optional warning message
:type warning: str
:return: Fallback response payload
:rtype: Dict[str, Any]
"""
payload = {
'extracted_data': {},
'validation_errors': [],
'extraction_method': 'ner-fallback',
'model': self.model_path,
'entities_found': 0,
'raw_ner_results': []
}
if warning:
payload['warnings'] = [warning]
return payload
# Istanza singleton del servizio NER
ner_service = NERService()