#!/usr/bin/env python3 """ Unified Processing Pipeline for News Dashboard Handles both text and document processing with a clean, cohesive interface """ import logging import pandas as pd from typing import List, Dict, Any, Optional, Tuple from datetime import datetime import os from scraper_common import scrape_news_async from document_processor import process_documents_from_url from data_preprocessor import DataPreprocessor from scraper_common import scraping_cancelled def determine_website_type(url: str) -> str: """ Determine website type from URL based on domain patterns and URL paths """ from urllib.parse import urlparse try: parsed_url = urlparse(url) domain = parsed_url.netloc.lower() url_lower = url.lower() # Check for specific URL paths first (more specific matches) if "frrims.faoswalim.org" in domain: return "faoswalim_frrims_river_levels" elif "faoswalim.org" in domain: if "water/water-publications" in url_lower or "water-publications" in url_lower: return "faoswalim_water_publications" elif "flood-watch-bulletin" in url_lower or "ag-document-type/flood-watch-bulletin" in url_lower: return "faoswalim_flood_watch" elif "swalim-articles" in url_lower: return "faoswalim_articles" elif "swalim-events" in url_lower: return "faoswalim_events" elif "swalim-journals" in url_lower: return "faoswalim_journals" elif "swalim-publications" in url_lower: return "faoswalim_publications" else: return "faoswalim" elif "fsnau.org" in domain: if "publications" in url_lower: return "fsnau_publications" else: return "fsnau" # Check for ICPAC seasonal forecast path if "icpac.net" in domain: if "seasonal-forecast" in url_lower: return "icpac_seasonal_forecast" else: return "icpac" # Map domains to website types domain_mapping = { 'reliefweb.int': 'reliefweb', 'fscluster.org': 'fscluster', 'mopnd.govsomaliland.org': 'mopnd', 'nbs.gov.so': 'nbs', 'data.humdata.org': 'hdx', 'logcluster.org': 'logcluster', 'fews.net': 'fews', 'hiiraan.com': 'hiiraan', 'ocha.un.org': 'ocha', 'unocha.org': 'ocha', 'sodma.gov.so': 'sodma', 'atmis-au.org': 'atmis', 'garoweonline.com': 'garowe', 'goobjoog.com': 'goobjoog', 'radiodalsan.com': 'radiodalsan', 'radioergo.org': 'radioergo', 'drought.emergency.copernicus.eu': 'copernicus_drought' } # Check for exact domain matches for domain_pattern, website_type in domain_mapping.items(): if domain_pattern in domain: return website_type # Default fallback return 'unknown' except Exception as e: logger.warning(f"Error determining website type from URL {url}: {str(e)}") return 'unknown' # Try to import model processor, handle gracefully if not available try: from model_processor import ModelProcessor MODELS_AVAILABLE = True except ImportError as e: print(f"Warning: Model processor not available: {e}") print("AI features will be disabled. Install torch and transformers for full functionality.") ModelProcessor = None MODELS_AVAILABLE = False # Configure detailed logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class UnifiedPipeline: """ Unified pipeline for processing both text and document content """ def __init__(self, device: str = "auto"): """ Initialize the unified pipeline Args: device: Device to run models on """ self.device = device self.data_preprocessor = None self.model_processor = None self.initialized = False # Processing statistics self.stats = { 'total_processed': 0, 'preprocessing_success': 0, 'model_processing_success': 0, 'final_success': 0, 'errors': [] } def initialize(self) -> bool: """ Initialize all processors Returns: True if all processors initialized successfully """ logger.info("🚀 Starting UnifiedPipeline initialization") if self.initialized: logger.info("✅ Pipeline already initialized, skipping") return True try: # Initialize data preprocessor logger.info("🔧 Initializing data preprocessor...") logger.debug(f"📋 Device configuration: {self.device}") self.data_preprocessor = DataPreprocessor() # Initialize model processor (if available) if MODELS_AVAILABLE and ModelProcessor is not None: logger.info("🤖 Initializing model processor...") logger.debug(f"🔧 Model processor device: {self.device}") self.model_processor = ModelProcessor(device=self.device) if not self.model_processor.load_models(): logger.warning("⚠️ Model processor failed to load, continuing without AI features") self.model_processor = None else: logger.info("✅ Model processor loaded successfully") else: logger.warning("⚠️ Model processor not available, continuing without AI features") self.model_processor = None self.initialized = True logger.info("✅ Pipeline initialization completed successfully") logger.debug(f"📊 Initialization stats: {self.stats}") return True except Exception as e: logger.error(f"Error initializing pipeline: {str(e)}") self.stats['errors'].append(f"Initialization error: {str(e)}") return False async def process_text_content(self, url: str, custom_keywords: str = "", start_date: str = None, end_date: str = None) -> Tuple[pd.DataFrame, List[Dict[str, Any]]]: """ Process text content from URL through the complete pipeline Args: url: URL to scrape content from custom_keywords: Custom keywords for filtering (comma-separated) Returns: Tuple of (DataFrame, full_content_data) """ try: logger.info(f"🚀 Starting text content processing for URL: {url}") logger.debug(f"📋 Processing parameters: URL={url}") # Check for cancellation before starting if scraping_cancelled(): logger.warning("⚠️ Processing cancelled before starting") return pd.DataFrame(), [] # Step 1: Scrape content logger.info("📡 Step 1: Scraping content...") logger.debug("🔍 Initiating website scraping...") # Determine website type from URL website_type = determine_website_type(url) logger.debug(f"🌐 Detected website type: {website_type}") # Force text scraper mode when called from process_text_content scraped_articles = await scrape_news_async(url, website_type, custom_keywords, start_date, end_date, force_mode="text") # Check for cancellation after scraping if scraping_cancelled(): logger.warning("⚠️ Processing cancelled after scraping") return pd.DataFrame(), [] if not scraped_articles: logger.warning("⚠️ No articles found to process") return pd.DataFrame(), [] # Check for special error indicators if len(scraped_articles) == 1: first_article = scraped_articles[0] if first_article.get("title") == "WEBSITE_LOAD_ERROR": error_msg = first_article.get("content", "Website is not working. Please try again later.") logger.error(f"❌ {error_msg}") raise Exception(error_msg) elif first_article.get("title") == "CAPTCHA_ERROR": error_msg = first_article.get("content", "Captcha detected. Please try again later.") logger.error(f"❌ {error_msg}") raise Exception(error_msg) logger.info(f"✅ Scraped {len(scraped_articles)} articles") logger.debug(f"📊 Article details: {[article.get('title', 'No title') for article in scraped_articles]}") # Step 2: Preprocessing logger.info("Step 2: Preprocessing content...") if not self.initialize(): logger.warning("Pipeline initialization failed, using raw data") preprocessed_articles = scraped_articles else: preprocessed_articles = self.data_preprocessor.preprocess_all_data(scraped_articles) self.stats['preprocessing_success'] = len(preprocessed_articles) logger.info(f"Preprocessing completed: {len(preprocessed_articles)} articles processed") # Check for cancellation after preprocessing if scraping_cancelled(): logger.warning("⚠️ Processing cancelled after preprocessing") return pd.DataFrame(), [] # Step 3: Model processing and DataFrame creation logger.info("Step 3: Processing with AI models and creating DataFrame...") df_data = [] full_content_data = [] for i, article in enumerate(preprocessed_articles, 1): # Check for cancellation during processing if scraping_cancelled(): logger.warning("⚠️ Processing cancelled during model processing") return pd.DataFrame(), [] # Extract content based on preprocessing status content_info = self._extract_content_info(article, is_preprocessed=self.initialized) # Process with AI models if available summary, summary_somali = self._process_with_models(content_info['content']) # Create DataFrame row row_data = { '#': str(i), 'title': content_info['title'], 'category': content_info.get('category', ''), 'content': content_info['content'], 'summary': summary, 'summary_somali': summary_somali, 'date': content_info['date'], 'url': content_info['url'] } logger.debug(f"DataFrame row data: {row_data}") df_data.append(row_data) # Store full content for modal full_content_data.append({ 'title': content_info['title'], 'content': content_info['content'], 'date': content_info['date'], 'url': content_info['url'] }) df = pd.DataFrame(df_data) self.stats['total_processed'] = len(df_data) self.stats['final_success'] = len(df_data) logger.info(f"Text content processing completed: {len(df_data)} items processed") logger.info(f"DataFrame columns: {list(df.columns)}") logger.info(f"DataFrame shape: {df.shape}") if not df.empty: logger.info(f"Sample DataFrame row: {df.iloc[0].to_dict()}") return df, full_content_data except Exception as e: logger.error(f"Error in text content processing: {str(e)}") self.stats['errors'].append(f"Text processing error: {str(e)}") return pd.DataFrame([{ '#': '1', 'title': f'Error: {str(e)}', 'content': '', 'summary': '', 'summary_somali': '', 'date': '', 'url': url }]), [] async def process_document_content(self, url: str, start_date: str = None, end_date: str = None) -> pd.DataFrame: """ Process document content from URL through the complete pipeline Args: url: URL to process documents from Returns: DataFrame with processed document content """ try: logger.info(f"Starting document content processing for URL: {url}") # Check for cancellation before starting if scraping_cancelled(): logger.warning("⚠️ Document processing cancelled before starting") return pd.DataFrame() # Step 1: Extract documents logger.info("Step 1: Extracting documents...") documents_data = await process_documents_from_url(url.strip()) # Check for cancellation after document extraction if scraping_cancelled(): logger.warning("⚠️ Document processing cancelled after extraction") return pd.DataFrame() if not documents_data: logger.warning("No documents found to process") return pd.DataFrame() # Check for special error indicators if len(documents_data) == 1: first_doc = documents_data[0] if first_doc.get("title") == "WEBSITE_LOAD_ERROR": error_msg = first_doc.get("content", "Website is not working. Please try again later.") logger.error(f"❌ {error_msg}") raise Exception(error_msg) elif first_doc.get("title") == "CAPTCHA_ERROR": error_msg = first_doc.get("content", "Captcha detected. Please try again later.") logger.error(f"❌ {error_msg}") raise Exception(error_msg) logger.info(f"Extracted {len(documents_data)} documents") # Step 2: Preprocessing logger.info("Step 2: Preprocessing documents...") if not self.initialize(): logger.warning("Pipeline initialization failed, using raw data") preprocessed_docs = documents_data else: preprocessed_docs = self.data_preprocessor.preprocess_all_data(documents_data) self.stats['preprocessing_success'] = len(preprocessed_docs) logger.info(f"Preprocessing completed: {len(preprocessed_docs)} documents processed") # Check for cancellation after preprocessing if scraping_cancelled(): logger.warning("⚠️ Document processing cancelled after preprocessing") return pd.DataFrame() # Step 3: Model processing and DataFrame creation logger.info("Step 3: Processing with AI models and creating DataFrame...") df_data = [] # Apply date filtering if provided (backup filter) from date_filter import is_date_in_range, parse_date_input start_dt = parse_date_input(start_date) if start_date else None end_dt = parse_date_input(end_date) if end_date else None for doc in preprocessed_docs: # Check for cancellation during processing if scraping_cancelled(): logger.warning("⚠️ Document processing cancelled during model processing") return pd.DataFrame() # Extract content based on preprocessing status content_info = self._extract_document_info(doc, is_preprocessed=self.initialized) # Apply date filtering (backup filter in case dates weren't filtered at scraper level) if start_dt is not None or end_dt is not None: doc_date = content_info.get('date', '') if not is_date_in_range(doc_date, start_dt, end_dt, include_missing=False): logger.debug(f"📅 Document date {doc_date} is outside date range - filtering out in pipeline") continue # Skip summary generation for CSV and PNG files file_type = content_info.get('file_type', '').upper() if file_type == 'CSV' or file_type == 'PNG': summary = "" summary_somali = "" logger.debug(f"⏭️ Skipping summary generation for {file_type} file: {content_info.get('title', 'Unknown')}") else: # Process with AI models if available summary, summary_somali = self._process_with_models(content_info['extracted_text']) # Create DataFrame row df_data.append({ 'title': content_info['title'], 'date': content_info['date'], 'source': content_info['source'], 'file_path': content_info['file_path'], 'extracted_text': content_info['extracted_text'], 'summary': summary, 'summary_somali': summary_somali, 'file_type': content_info['file_type'] }) df = pd.DataFrame(df_data) self.stats['total_processed'] = len(df_data) self.stats['final_success'] = len(df_data) logger.info(f"Document content processing completed: {len(df_data)} items processed") return df except Exception as e: logger.error(f"Error in document content processing: {str(e)}") self.stats['errors'].append(f"Document processing error: {str(e)}") return pd.DataFrame([{ 'title': f'Error: {str(e)}', 'date': '', 'source': '', 'file_path': '', 'extracted_text': '', 'summary': '', 'summary_somali': '', 'file_type': '' }]) def _extract_content_info(self, article: Dict[str, Any], is_preprocessed: bool) -> Dict[str, str]: """ Extract content information from article Args: article: Article data is_preprocessed: Whether the article has been preprocessed Returns: Dictionary with content information """ if is_preprocessed and isinstance(article, dict) and 'content' in article: # Use preprocessed content content_data = article.get('content', {}) if isinstance(content_data, dict): return { 'title': article.get('source_metadata', {}).get('title', ''), 'content': content_data.get('cleaned_text', ''), 'date': article.get('source_metadata', {}).get('date', ''), 'url': article.get('source_metadata', {}).get('url', ''), 'category': article.get('source_metadata', {}).get('category', '') } # Fallback to original structure result = { 'title': article.get('title', ''), 'content': article.get('content', ''), 'date': article.get('date', ''), 'url': article.get('url', ''), 'category': article.get('category', '') } logger.debug(f"Extracted content info: {result}") return result def _extract_document_info(self, doc: Dict[str, Any], is_preprocessed: bool) -> Dict[str, str]: """ Extract document information Args: doc: Document data is_preprocessed: Whether the document has been preprocessed Returns: Dictionary with document information """ if is_preprocessed and isinstance(doc, dict) and 'content' in doc: # Use preprocessed content content_data = doc.get('content', {}) source_metadata = doc.get('source_metadata', {}) if isinstance(content_data, dict): # Use 'source' field from source_metadata if available, otherwise fall back to source_website # If source_website is available but source is not, try to map it source = source_metadata.get('source', '') if not source: source_website = source_metadata.get('source_website', '') if source_website and source_website != 'unknown': # Map source_website to proper name from data_preprocessor import DataPreprocessor preprocessor = DataPreprocessor() source = preprocessor._map_source_website_to_name(source_website) else: # Last resort: try to get source from URL url = source_metadata.get('url', '') or source_metadata.get('pdf_path', '') if url: try: from utils import get_source_from_url source = get_source_from_url(url) except: source = 'Unknown' else: source = 'Unknown' return { 'title': source_metadata.get('title', ''), 'extracted_text': content_data.get('cleaned_text', ''), 'date': source_metadata.get('date', ''), 'source': source, 'file_path': source_metadata.get('pdf_path', ''), 'file_type': source_metadata.get('file_type', '') or source_metadata.get('content_type', '') } # Fallback to original structure source = doc.get('source', '') if not source: # Try to get source from URL if available url = doc.get('url', '') or doc.get('file_path', '') or doc.get('pdf_path', '') if url: try: from utils import get_source_from_url source = get_source_from_url(url) except: source = 'Unknown' else: source = 'Unknown' return { 'title': doc.get('title', ''), 'extracted_text': doc.get('extracted_text', ''), 'date': doc.get('date', ''), 'source': source, 'file_path': doc.get('pdf_path', '') or doc.get('local_path', ''), 'file_type': doc.get('file_type', '') } def _process_with_models(self, content: str) -> Tuple[str, str]: """ Process content with AI models Args: content: Text content to process Returns: Tuple of (summary, summary_somali) """ if not self.model_processor or not content.strip(): return "", "" try: model_results = self.model_processor.process_content(content) if model_results.get('processing_success', False): self.stats['model_processing_success'] += 1 return model_results.get('summary', ''), model_results.get('summary_somali', '') except Exception as e: logger.error(f"Error in model processing: {str(e)}") self.stats['errors'].append(f"Model processing error: {str(e)}") return "", "" def get_stats(self) -> Dict[str, Any]: """ Get processing statistics Returns: Dictionary with processing statistics """ return { 'pipeline_stats': self.stats.copy(), 'preprocessing_stats': self.data_preprocessor.get_processing_stats() if self.data_preprocessor else {}, 'model_info': self.model_processor.get_model_info() if self.model_processor else {} } def reset_stats(self): """Reset processing statistics""" self.stats = { 'total_processed': 0, 'preprocessing_success': 0, 'model_processing_success': 0, 'final_success': 0, 'errors': [] } # Global pipeline instance _pipeline = None def get_pipeline(device: str = "auto") -> UnifiedPipeline: """ Get or create the global pipeline instance Args: device: Device to run models on Returns: UnifiedPipeline instance """ global _pipeline if _pipeline is None: _pipeline = UnifiedPipeline(device=device) return _pipeline def process_text_content(url: str, custom_keywords: str = "", start_date: str = None, end_date: str = None) -> Tuple[pd.DataFrame, List[Dict[str, Any]]]: """ Convenience function to process text content Args: url: URL to process custom_keywords: Custom keywords for filtering (comma-separated) Returns: Tuple of (DataFrame, full_content_data) """ pipeline = get_pipeline() return pipeline.process_text_content(url, custom_keywords, start_date, end_date) async def process_document_content(url: str, start_date: str = None, end_date: str = None) -> pd.DataFrame: """ Convenience function to process document content Args: url: URL to process Returns: DataFrame with processed content """ pipeline = get_pipeline() return await pipeline.process_document_content(url) def get_processing_stats() -> Dict[str, Any]: """ Get processing statistics Returns: Dictionary with processing statistics """ pipeline = get_pipeline() return pipeline.get_stats()