|
|
|
|
|
""" |
|
|
Data Preprocessing Pipeline for News Dashboard |
|
|
Handles preprocessing of scraped content for translation, summarization, and other operations |
|
|
""" |
|
|
|
|
|
import re |
|
|
import logging |
|
|
from typing import List, Dict, Any, Optional |
|
|
from datetime import datetime |
|
|
import hashlib |
|
|
import unicodedata |
|
|
from scraper_common import scraping_cancelled |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DataPreprocessor: |
|
|
""" |
|
|
Data preprocessing pipeline for news dashboard content |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.cleaned_data = [] |
|
|
self.processing_stats = { |
|
|
'total_processed': 0, |
|
|
'successful_processing': 0, |
|
|
'failed_processing': 0, |
|
|
'content_issues': 0, |
|
|
'metadata_issues': 0 |
|
|
} |
|
|
|
|
|
def preprocess_all_data(self, raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main preprocessing function that processes all scraped data |
|
|
|
|
|
Args: |
|
|
raw_data: List of dictionaries containing scraped content |
|
|
|
|
|
Returns: |
|
|
List of preprocessed dictionaries ready for downstream operations |
|
|
""" |
|
|
logger.info(f"Starting preprocessing of {len(raw_data)} items") |
|
|
|
|
|
processed_data = [] |
|
|
|
|
|
for item in raw_data: |
|
|
|
|
|
if scraping_cancelled(): |
|
|
logger.warning("β οΈ Preprocessing cancelled by user") |
|
|
return processed_data |
|
|
|
|
|
try: |
|
|
processed_item = self._preprocess_single_item(item) |
|
|
if processed_item: |
|
|
processed_data.append(processed_item) |
|
|
self.processing_stats['successful_processing'] += 1 |
|
|
else: |
|
|
self.processing_stats['failed_processing'] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing item: {str(e)}") |
|
|
self.processing_stats['failed_processing'] += 1 |
|
|
|
|
|
self.processing_stats['total_processed'] += 1 |
|
|
|
|
|
logger.info(f"Preprocessing completed. Stats: {self.processing_stats}") |
|
|
return processed_data |
|
|
|
|
|
def _preprocess_single_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Preprocess a single data item |
|
|
|
|
|
Args: |
|
|
item: Single dictionary containing scraped content |
|
|
|
|
|
Returns: |
|
|
Preprocessed dictionary or None if processing failed |
|
|
""" |
|
|
try: |
|
|
|
|
|
logger.info(f"π Raw item structure for preprocessing:") |
|
|
logger.info(f" - Keys: {list(item.keys())}") |
|
|
logger.info(f" - extracted_text length: {len(item.get('extracted_text', ''))}") |
|
|
logger.info(f" - content length: {len(item.get('content', ''))}") |
|
|
|
|
|
|
|
|
processed_content = self._clean_and_structure_content(item) |
|
|
processed_item = { |
|
|
'id': self._generate_unique_id(item), |
|
|
'source_metadata': self._extract_source_metadata(item), |
|
|
'content': processed_content, |
|
|
'metadata': self._enrich_metadata(processed_content), |
|
|
'quality_metrics': self._calculate_quality_metrics(processed_content), |
|
|
'processing_timestamp': datetime.now().isoformat(), |
|
|
'ready_for_operations': True |
|
|
} |
|
|
|
|
|
|
|
|
logger.debug(f"π Processed item structure for {processed_item.get('id', 'unknown')}:") |
|
|
logger.debug(f" - Keys: {list(processed_item.keys())}") |
|
|
logger.debug(f" - Content keys: {list(processed_item.get('content', {}).keys())}") |
|
|
logger.debug(f" - Metadata keys: {list(processed_item.get('metadata', {}).keys())}") |
|
|
|
|
|
|
|
|
if self._validate_processed_item(processed_item): |
|
|
return processed_item |
|
|
else: |
|
|
logger.warning(f"Validation failed for item: {processed_item.get('id', 'unknown')}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error preprocessing item: {str(e)}") |
|
|
return None |
|
|
|
|
|
def _generate_unique_id(self, item: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Generate a unique identifier for the content item |
|
|
|
|
|
Args: |
|
|
item: Raw data item |
|
|
|
|
|
Returns: |
|
|
Unique identifier string |
|
|
""" |
|
|
|
|
|
url = item.get('url', '') or item.get('file_path', '') |
|
|
title = item.get('title', '') |
|
|
|
|
|
|
|
|
content_string = f"{url}{title}" |
|
|
return hashlib.md5(content_string.encode()).hexdigest()[:12] |
|
|
|
|
|
def _extract_source_metadata(self, item: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Extract and structure source metadata |
|
|
|
|
|
Args: |
|
|
item: Raw data item |
|
|
|
|
|
Returns: |
|
|
Dictionary containing source metadata |
|
|
""" |
|
|
|
|
|
content_text = item.get('content', '') or item.get('extracted_text', '') |
|
|
url = item.get('url', '') or item.get('file_path', '') |
|
|
|
|
|
|
|
|
original_source = item.get('source', '') |
|
|
source_website = self._identify_source_website(url) |
|
|
|
|
|
|
|
|
|
|
|
if not original_source and source_website == 'unknown' and url: |
|
|
try: |
|
|
from utils import get_source_from_url |
|
|
original_source = get_source_from_url(url) |
|
|
except: |
|
|
pass |
|
|
|
|
|
result = { |
|
|
'url': url, |
|
|
'title': item.get('title', ''), |
|
|
'date': item.get('date', ''), |
|
|
'category': item.get('category', ''), |
|
|
'source': original_source or self._map_source_website_to_name(source_website), |
|
|
'source_website': source_website, |
|
|
'content_type': self._identify_content_type(item), |
|
|
'file_type': item.get('file_type', ''), |
|
|
'language': self._detect_language(content_text), |
|
|
'pdf_path': item.get('pdf_path', '') or item.get('file_path', ''), |
|
|
'original_structure': { |
|
|
'has_pdf': bool(item.get('pdf_path') or item.get('file_path')), |
|
|
'content_length': len(content_text), |
|
|
'title_length': len(item.get('title', '')) |
|
|
} |
|
|
} |
|
|
|
|
|
logger.debug(f"π Extracted source metadata category: '{result.get('category', '')}'") |
|
|
logger.debug(f"π Preserved source: '{result.get('source', '')}'") |
|
|
return result |
|
|
|
|
|
def _clean_and_structure_content(self, item: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Clean and structure the content for downstream processing |
|
|
|
|
|
Args: |
|
|
item: Raw data item |
|
|
|
|
|
Returns: |
|
|
Dictionary containing cleaned and structured content |
|
|
""" |
|
|
|
|
|
raw_content = item.get('content', '') or item.get('extracted_text', '') |
|
|
|
|
|
|
|
|
logger.info(f"π Content extraction debug:") |
|
|
logger.info(f" - item.get('content', ''): '{item.get('content', '')}'") |
|
|
logger.info(f" - item.get('extracted_text', ''): '{item.get('extracted_text', '')[:100]}...'") |
|
|
logger.info(f" - raw_content length: {len(raw_content)}") |
|
|
|
|
|
|
|
|
cleaned_content = self._clean_text(raw_content) |
|
|
logger.info(f" - cleaned_content length: {len(cleaned_content)}") |
|
|
|
|
|
|
|
|
structured_content = { |
|
|
'raw_text': raw_content, |
|
|
'cleaned_text': cleaned_content, |
|
|
'text_blocks': self._split_into_blocks(cleaned_content), |
|
|
'sentences': self._split_into_sentences(cleaned_content), |
|
|
'summary_ready': self._prepare_for_summarization(cleaned_content), |
|
|
'translation_ready': self._prepare_for_translation(cleaned_content) |
|
|
} |
|
|
|
|
|
return structured_content |
|
|
|
|
|
def _enrich_metadata(self, processed_content: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Enrich metadata with additional information |
|
|
|
|
|
Args: |
|
|
processed_content: Processed content dictionary |
|
|
|
|
|
Returns: |
|
|
Dictionary containing enriched metadata |
|
|
""" |
|
|
|
|
|
content = processed_content.get('cleaned_text', '') |
|
|
|
|
|
return { |
|
|
'word_count': len(content.split()), |
|
|
'character_count': len(content), |
|
|
'sentence_count': len(self._split_into_sentences(content)), |
|
|
'paragraph_count': len(self._split_into_blocks(content)), |
|
|
'reading_time_minutes': self._calculate_reading_time(content), |
|
|
'complexity_score': self._calculate_complexity_score(content) |
|
|
} |
|
|
|
|
|
def _calculate_quality_metrics(self, processed_content: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate quality metrics for the content |
|
|
|
|
|
Args: |
|
|
processed_content: Processed content dictionary |
|
|
|
|
|
Returns: |
|
|
Dictionary containing quality metrics |
|
|
""" |
|
|
content = processed_content.get('cleaned_text', '') |
|
|
title = processed_content.get('title', '') |
|
|
|
|
|
return { |
|
|
'content_quality': { |
|
|
'completeness_score': self._calculate_completeness_score(content), |
|
|
'coherence_score': self._calculate_coherence_score(content), |
|
|
'relevance_score': self._calculate_relevance_score(content, title), |
|
|
'readability_score': self._calculate_readability_score(content) |
|
|
}, |
|
|
'data_quality': { |
|
|
'has_title': bool(title.strip()), |
|
|
'has_content': bool(content.strip()), |
|
|
'has_url': bool(processed_content.get('url', '').strip()), |
|
|
'content_length_adequate': len(content) > 100, |
|
|
'title_length_adequate': 10 < len(title) < 200 |
|
|
}, |
|
|
'processing_quality': { |
|
|
'successfully_cleaned': bool(self._clean_text(content)), |
|
|
'successfully_structured': bool(self._split_into_blocks(content)) |
|
|
} |
|
|
} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
""" |
|
|
Clean and normalize text content |
|
|
|
|
|
Args: |
|
|
text: Raw text content |
|
|
|
|
|
Returns: |
|
|
Cleaned text content |
|
|
""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
text = text.strip() |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text) |
|
|
|
|
|
|
|
|
text = unicodedata.normalize('NFKD', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[\.]{2,}', '.', text) |
|
|
text = re.sub(r'[!]{2,}', '!', text) |
|
|
text = re.sub(r'[?]{2,}', '?', text) |
|
|
|
|
|
return text |
|
|
|
|
|
def _split_into_blocks(self, text: str) -> List[str]: |
|
|
""" |
|
|
Split text into logical blocks (paragraphs) |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
List of text blocks |
|
|
""" |
|
|
if not text: |
|
|
return [] |
|
|
|
|
|
|
|
|
blocks = re.split(r'\n\s*\n|\.\s+(?=[A-Z])', text) |
|
|
return [block.strip() for block in blocks if block.strip()] |
|
|
|
|
|
def _split_into_sentences(self, text: str) -> List[str]: |
|
|
""" |
|
|
Split text into sentences |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
List of sentences |
|
|
""" |
|
|
if not text: |
|
|
return [] |
|
|
|
|
|
|
|
|
sentences = re.split(r'[.!?]+', text) |
|
|
return [sentence.strip() for sentence in sentences if sentence.strip()] |
|
|
|
|
|
def _prepare_for_summarization(self, text: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Prepare content for summarization |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Dictionary ready for summarization |
|
|
""" |
|
|
blocks = self._split_into_blocks(text) |
|
|
sentences = self._split_into_sentences(text) |
|
|
|
|
|
return { |
|
|
'text': text, |
|
|
'blocks': blocks, |
|
|
'sentences': sentences, |
|
|
'block_count': len(blocks), |
|
|
'sentence_count': len(sentences), |
|
|
'avg_sentence_length': sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0, |
|
|
'summary_priority': self._calculate_summary_priority(text) |
|
|
} |
|
|
|
|
|
def _prepare_for_translation(self, text: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Prepare content for translation |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Dictionary ready for translation |
|
|
""" |
|
|
return { |
|
|
'text': text, |
|
|
'language_detected': self._detect_language(text), |
|
|
'translation_blocks': self._split_into_blocks(text), |
|
|
'character_count': len(text), |
|
|
'word_count': len(text.split()), |
|
|
'translation_priority': self._calculate_translation_priority(text) |
|
|
} |
|
|
|
|
|
def _identify_source_website(self, url: str) -> str: |
|
|
""" |
|
|
Identify the source website from URL |
|
|
|
|
|
Args: |
|
|
url: URL string |
|
|
|
|
|
Returns: |
|
|
Website identifier |
|
|
""" |
|
|
if 'reliefweb.int' in url: |
|
|
return 'reliefweb' |
|
|
elif 'fscluster.org' in url: |
|
|
return 'fscluster' |
|
|
elif 'mopnd.govsomaliland.org' in url: |
|
|
return 'mopnd' |
|
|
elif 'nbs.gov.so' in url: |
|
|
return 'nbs' |
|
|
elif 'humdata.org' in url: |
|
|
return 'hdx' |
|
|
elif 'logcluster.org' in url: |
|
|
return 'logcluster' |
|
|
elif 'fsnau.org' in url: |
|
|
return 'fsnau' |
|
|
elif 'fews.net' in url: |
|
|
return 'fews' |
|
|
elif 'icpac.net' in url: |
|
|
if 'seasonal-forecast' in url.lower(): |
|
|
return 'icpac_seasonal_forecast' |
|
|
else: |
|
|
return 'icpac' |
|
|
elif 'faoswalim.org' in url: |
|
|
return 'faoswalim' |
|
|
else: |
|
|
return 'unknown' |
|
|
|
|
|
def _map_source_website_to_name(self, source_website: str) -> str: |
|
|
""" |
|
|
Map source website identifier to proper source name |
|
|
|
|
|
Args: |
|
|
source_website: Website identifier (lowercase) |
|
|
|
|
|
Returns: |
|
|
Proper source name |
|
|
""" |
|
|
mapping = { |
|
|
'reliefweb': 'ReliefWeb', |
|
|
'fscluster': 'FS Cluster', |
|
|
'mopnd': 'MOPND Somaliland', |
|
|
'nbs': 'NBS Somalia', |
|
|
'hdx': 'HDX Humanitarian Data Exchange', |
|
|
'logcluster': 'LogCluster', |
|
|
'fsnau': 'FSNau - Food Security and Nutrition Analysis Unit', |
|
|
'fews': 'FEWS NET', |
|
|
'icpac': 'ICPAC', |
|
|
'icpac_seasonal_forecast': 'ICPAC - IGAD Climate Prediction and Applications Centre - Seasonal Forecast', |
|
|
'faoswalim': 'FAO SWALIM' |
|
|
} |
|
|
return mapping.get(source_website, 'Unknown') |
|
|
|
|
|
def _identify_content_type(self, item: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Identify the type of content |
|
|
|
|
|
Args: |
|
|
item: Raw data item |
|
|
|
|
|
Returns: |
|
|
Content type identifier |
|
|
""" |
|
|
|
|
|
if item.get('file_type'): |
|
|
file_type = item.get('file_type', '').lower() |
|
|
if 'pdf' in file_type: |
|
|
return 'pdf_document' |
|
|
elif 'doc' in file_type: |
|
|
return 'word_document' |
|
|
elif 'csv' in file_type: |
|
|
return 'csv_data' |
|
|
else: |
|
|
return f'{file_type}_document' |
|
|
|
|
|
|
|
|
elif item.get('pdf_path') or item.get('file_path'): |
|
|
return 'pdf_document' |
|
|
|
|
|
|
|
|
url = item.get('url', '') or item.get('file_path', '') |
|
|
if 'article' in url.lower(): |
|
|
return 'article' |
|
|
elif 'publication' in url.lower(): |
|
|
return 'publication' |
|
|
elif 'journal' in url.lower(): |
|
|
return 'journal' |
|
|
elif 'event' in url.lower(): |
|
|
return 'event' |
|
|
else: |
|
|
return 'general' |
|
|
|
|
|
def _detect_language(self, text: str) -> str: |
|
|
""" |
|
|
Detect language of the text (simplified) |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Language code |
|
|
""" |
|
|
if not text: |
|
|
return 'unknown' |
|
|
|
|
|
|
|
|
somali_words = ['somalia', 'somaliland', 'puntland', 'mogadishu', 'hargeisa'] |
|
|
english_words = ['the', 'and', 'of', 'in', 'to', 'for', 'with', 'on', 'at'] |
|
|
|
|
|
text_lower = text.lower() |
|
|
somali_count = sum(1 for word in somali_words if word in text_lower) |
|
|
english_count = sum(1 for word in english_words if word in text_lower) |
|
|
|
|
|
if somali_count > english_count: |
|
|
return 'so' |
|
|
elif english_count > somali_count: |
|
|
return 'en' |
|
|
else: |
|
|
return 'unknown' |
|
|
|
|
|
def _calculate_reading_time(self, text: str) -> float: |
|
|
""" |
|
|
Calculate estimated reading time in minutes |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Reading time in minutes |
|
|
""" |
|
|
word_count = len(text.split()) |
|
|
return round(word_count / 200, 1) |
|
|
|
|
|
def _calculate_complexity_score(self, text: str) -> float: |
|
|
""" |
|
|
Calculate text complexity score |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Complexity score (0-1) |
|
|
""" |
|
|
if not text: |
|
|
return 0.0 |
|
|
|
|
|
sentences = self._split_into_sentences(text) |
|
|
if not sentences: |
|
|
return 0.0 |
|
|
|
|
|
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) |
|
|
long_words = sum(1 for word in text.split() if len(word) > 6) |
|
|
total_words = len(text.split()) |
|
|
|
|
|
complexity = (avg_sentence_length / 20) + (long_words / total_words if total_words > 0 else 0) |
|
|
return min(complexity, 1.0) |
|
|
|
|
|
def _calculate_completeness_score(self, content: str) -> float: |
|
|
""" |
|
|
Calculate content completeness score |
|
|
|
|
|
Args: |
|
|
content: Text content |
|
|
|
|
|
Returns: |
|
|
Completeness score (0-1) |
|
|
""" |
|
|
if not content: |
|
|
return 0.0 |
|
|
|
|
|
score = 0.0 |
|
|
|
|
|
|
|
|
if len(content) > 100: |
|
|
score += 0.3 |
|
|
|
|
|
|
|
|
sentences = self._split_into_sentences(content) |
|
|
if len(sentences) > 3: |
|
|
score += 0.3 |
|
|
|
|
|
|
|
|
blocks = self._split_into_blocks(content) |
|
|
if len(blocks) > 1: |
|
|
score += 0.2 |
|
|
|
|
|
|
|
|
if len(content.split()) > 10: |
|
|
score += 0.2 |
|
|
|
|
|
return min(score, 1.0) |
|
|
|
|
|
def _calculate_coherence_score(self, content: str) -> float: |
|
|
""" |
|
|
Calculate content coherence score |
|
|
|
|
|
Args: |
|
|
content: Text content |
|
|
|
|
|
Returns: |
|
|
Coherence score (0-1) |
|
|
""" |
|
|
if not content: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
sentences = self._split_into_sentences(content) |
|
|
if len(sentences) < 2: |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
proper_endings = sum(1 for s in sentences if s.endswith(('.', '!', '?'))) |
|
|
coherence = proper_endings / len(sentences) |
|
|
|
|
|
return min(coherence, 1.0) |
|
|
|
|
|
def _calculate_relevance_score(self, content: str, title: str) -> float: |
|
|
""" |
|
|
Calculate content relevance score |
|
|
|
|
|
Args: |
|
|
content: Text content |
|
|
title: Title text |
|
|
|
|
|
Returns: |
|
|
Relevance score (0-1) |
|
|
""" |
|
|
if not content or not title: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
title_words = set(title.lower().split()) |
|
|
content_words = set(content.lower().split()) |
|
|
|
|
|
overlap = len(title_words.intersection(content_words)) |
|
|
relevance = overlap / len(title_words) if title_words else 0.0 |
|
|
|
|
|
return min(relevance, 1.0) |
|
|
|
|
|
def _calculate_readability_score(self, content: str) -> float: |
|
|
""" |
|
|
Calculate readability score |
|
|
|
|
|
Args: |
|
|
content: Text content |
|
|
|
|
|
Returns: |
|
|
Readability score (0-1) |
|
|
""" |
|
|
if not content: |
|
|
return 0.0 |
|
|
|
|
|
sentences = self._split_into_sentences(content) |
|
|
words = content.split() |
|
|
|
|
|
if not sentences or not words: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
avg_sentence_length = len(words) / len(sentences) |
|
|
avg_word_length = sum(len(word) for word in words) / len(words) |
|
|
|
|
|
|
|
|
readability = 1.0 - (avg_sentence_length / 50) - (avg_word_length / 10) |
|
|
|
|
|
return max(0.0, min(readability, 1.0)) |
|
|
|
|
|
def _calculate_summary_priority(self, text: str) -> str: |
|
|
""" |
|
|
Calculate summary priority |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Priority level |
|
|
""" |
|
|
word_count = len(text.split()) |
|
|
|
|
|
if word_count > 1000: |
|
|
return 'high' |
|
|
elif word_count > 500: |
|
|
return 'medium' |
|
|
else: |
|
|
return 'low' |
|
|
|
|
|
def _calculate_translation_priority(self, text: str) -> str: |
|
|
""" |
|
|
Calculate translation priority |
|
|
|
|
|
Args: |
|
|
text: Text content |
|
|
|
|
|
Returns: |
|
|
Priority level |
|
|
""" |
|
|
|
|
|
important_keywords = ['emergency', 'crisis', 'disaster', 'flood', 'drought', 'food', 'security'] |
|
|
text_lower = text.lower() |
|
|
|
|
|
if any(keyword in text_lower for keyword in important_keywords): |
|
|
return 'high' |
|
|
elif len(text) > 500: |
|
|
return 'medium' |
|
|
else: |
|
|
return 'low' |
|
|
|
|
|
def _validate_processed_item(self, item: Dict[str, Any]) -> bool: |
|
|
""" |
|
|
Validate processed item |
|
|
|
|
|
Args: |
|
|
item: Processed item |
|
|
|
|
|
Returns: |
|
|
True if valid, False otherwise |
|
|
""" |
|
|
required_fields = ['id', 'source_metadata', 'content', 'metadata'] |
|
|
|
|
|
|
|
|
missing_fields = [] |
|
|
for field in required_fields: |
|
|
if field not in item: |
|
|
missing_fields.append(field) |
|
|
|
|
|
if missing_fields: |
|
|
logger.warning(f"β Missing required fields: {missing_fields}") |
|
|
logger.warning(f"π Available fields: {list(item.keys())}") |
|
|
return False |
|
|
|
|
|
|
|
|
content = item.get('content', {}) |
|
|
cleaned_text = content.get('cleaned_text', '') |
|
|
if not cleaned_text: |
|
|
logger.warning(f"β No cleaned_text found in content") |
|
|
logger.warning(f"π Content structure: {content}") |
|
|
return False |
|
|
|
|
|
|
|
|
metadata = item.get('metadata', {}) |
|
|
word_count = metadata.get('word_count', 0) |
|
|
if word_count < 10: |
|
|
logger.warning(f"β Word count too low: {word_count} (minimum: 10)") |
|
|
logger.warning(f"π Metadata: {metadata}") |
|
|
return False |
|
|
|
|
|
logger.debug(f"β
Validation passed for item {item.get('id', 'unknown')}") |
|
|
return True |
|
|
|
|
|
def get_processing_stats(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get processing statistics |
|
|
|
|
|
Returns: |
|
|
Dictionary containing processing statistics |
|
|
""" |
|
|
return self.processing_stats.copy() |
|
|
|
|
|
|
|
|
def preprocess_scraped_data(raw_data: List[Dict[str, Any]], output_path: Optional[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Convenience function to preprocess scraped data |
|
|
|
|
|
Args: |
|
|
raw_data: List of raw scraped data |
|
|
output_path: Optional output file path (deprecated - not used) |
|
|
|
|
|
Returns: |
|
|
List of preprocessed data |
|
|
""" |
|
|
preprocessor = DataPreprocessor() |
|
|
processed_data = preprocessor.preprocess_all_data(raw_data) |
|
|
|
|
|
return processed_data |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
sample_data = [ |
|
|
{ |
|
|
'title': 'Sample Article', |
|
|
'content': 'This is a sample article about water management in Somalia.', |
|
|
'url': 'https://example.com/article1', |
|
|
'date': '2024-01-01' |
|
|
} |
|
|
] |
|
|
|
|
|
processed = preprocess_scraped_data(sample_data) |
|
|
print(f"Processed {len(processed)} items") |