""" Common scraper functions - shared utilities for document and text scraping """ import asyncio import logging import os import json import hashlib from datetime import datetime from typing import List, Dict, Any from urllib.parse import urljoin, urlparse from playwright.async_api import async_playwright # --- Minimal Playwright hardening for headless containers (ADDED) --- os.environ.setdefault("PLAYWRIGHT_BROWSERS_PATH", "/root/.cache/ms-playwright") PLAYWRIGHT_LAUNCH_KW = dict( headless=True, # critical in HF Spaces/containers (no X server) args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--no-zygote", "--single-process", "--disable-extensions", "--disable-background-networking", ], ) # -------------------------------------------------------------------- # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' ) logger = logging.getLogger(__name__) # Global timeout tracking for problematic URLs TIMEOUT_URLS = set() # Global flag for document-only scraping mode (text tab should ignore documents) DOCUMENT_ONLY_MODE = False # Global cancellation flag _scraping_cancelled = False # Global browser instance for cancellation current_browser = None current_page = None # Global captcha status for UI updates _captcha_status = None # Global constants for limiting scraping scope # Set these to None to disable limits, or to a number to limit MAX_PDF_LIMIT = 50 # Global limit to only process/download PDFs across all pages MAX_ARTICLE_LIMIT = 50 # Limit to only process 3 articles MAX_PAGE_LIMIT = 50 # Limit to only scrape 3 pages # Global PDF counter to track PDFs across all pages global_pdf_count = 0 def reset_global_pdf_count(): """Reset the global PDF counter""" global global_pdf_count global_pdf_count = 0 def increment_global_pdf_count(): """Increment the global PDF counter and return the new count""" global global_pdf_count global_pdf_count += 1 return global_pdf_count def get_global_pdf_count(): """Get the current global PDF count""" return global_pdf_count def is_pdf_limit_reached(): """Check if the global PDF limit has been reached""" if MAX_PDF_LIMIT is None: return False return global_pdf_count >= MAX_PDF_LIMIT # Archive management ARCHIVE_DIR = "archive" ARCHIVE_INDEX = os.path.join(ARCHIVE_DIR, "archive_index.json") # Load website configuration def load_website_config(): """Load website configuration from JSON file""" try: with open('website_config.json', 'r') as f: config = json.load(f) logger.info("✅ Website configuration loaded successfully") return config except Exception as e: logger.error(f"❌ Error loading website configuration: {str(e)}") return {} # Load the website configuration WEBSITE_CONFIG = load_website_config() def get_pdf_websites() -> List[str]: """ Dynamically get list of PDF websites from website_config.json A website is considered a PDF website if it has 'pdf_links', 'file_links', or 'extract_table_as_csv' in its config """ pdf_websites = [] for website_type, config in WEBSITE_CONFIG.items(): if config and isinstance(config, dict): # Check if config has pdf_links, file_links, or extract_table_as_csv if config.get("pdf_links") or config.get("file_links") or config.get("extract_table_as_csv"): pdf_websites.append(website_type) return pdf_websites def get_content_websites() -> List[str]: """ Dynamically get list of content (text) websites from website_config.json A website is considered a content website if it does NOT have 'pdf_links' or 'file_links' """ content_websites = [] for website_type, config in WEBSITE_CONFIG.items(): if config and isinstance(config, dict): if not config.get("pdf_links") and not config.get("file_links"): content_websites.append(website_type) return content_websites # Debug: Print configured website types when module loads _debug_pdf_websites = get_pdf_websites() _debug_content_websites = get_content_websites() logger.debug(f"📄 PDF Websites configured ({len(_debug_pdf_websites)}): {sorted(_debug_pdf_websites)}") logger.debug(f"📰 Content Websites configured ({len(_debug_content_websites)}): {sorted(_debug_content_websites)}") def validate_website_config(config: dict) -> tuple[bool, str]: """ Validate website configuration structure Args: config: Configuration dictionary to validate Returns: Tuple of (is_valid, error_message) """ try: if not isinstance(config, dict): return False, "Configuration must be a dictionary" for website_type, website_config in config.items(): if not isinstance(website_type, str): return False, f"Website type must be a string, got {type(website_type)}" # Validate website type name (no spaces, valid identifier) if ' ' in website_type or not website_type: return False, f"Website type '{website_type}' must be a valid identifier (no spaces)" if not isinstance(website_config, dict): return False, f"Configuration for '{website_type}' must be a dictionary" # Check required fields: title and content (at least one must be present) if 'title' not in website_config and 'content' not in website_config: return False, f"Website '{website_type}' must have at least 'title' or 'content' field" # Validate field types string_fields = ['article_links', 'page_links', 'title', 'content', 'date', 'navigation_selector', 'navigation_url_addition', 'recaptcha_text'] for field in string_fields: if field in website_config: value = website_config[field] # Allow string, None, or list (for content field) if value is not None and not isinstance(value, (str, list)): return False, f"Field '{field}' in '{website_type}' must be string, list, or null" # Validate start_page (must be integer >= 0) if 'start_page' in website_config: start_page = website_config['start_page'] if start_page is not None: try: start_page_int = int(start_page) if start_page_int < 0: return False, f"'start_page' in '{website_type}' must be >= 0" except (ValueError, TypeError): return False, f"'start_page' in '{website_type}' must be an integer" # Validate array fields array_fields = ['pdf_links', 'file_links'] for field in array_fields: if field in website_config: value = website_config[field] if value is not None: if isinstance(value, str): # Allow string, will be converted to array pass elif not isinstance(value, list): return False, f"Field '{field}' in '{website_type}' must be a list or null" return True, "Configuration is valid" except Exception as e: return False, f"Validation error: {str(e)}" def save_website_config(config_data: dict) -> tuple[bool, str]: """ Save validated website configuration to file Args: config_data: Configuration dictionary to save Returns: Tuple of (success, message) """ global WEBSITE_CONFIG try: # Validate the structure first is_valid, error_message = validate_website_config(config_data) if not is_valid: return False, f"Invalid configuration: {error_message}" # Save to file with open('website_config.json', 'w', encoding='utf-8') as f: json.dump(config_data, f, indent=4, ensure_ascii=False) # Reload the global config WEBSITE_CONFIG = load_website_config() logger.info("✅ Website configuration saved successfully") return True, "Website configuration saved successfully" except Exception as e: error_msg = f"Error saving website config: {str(e)}" logger.error(f"❌ {error_msg}") return False, error_msg def set_document_only_mode(value: bool): """Set the global document-only mode flag.""" global DOCUMENT_ONLY_MODE DOCUMENT_ONLY_MODE = value def is_document_mode_enabled() -> bool: """Check if document-only mode is enabled.""" return DOCUMENT_ONLY_MODE def set_scraping_cancelled(value: bool): """Set the global cancellation flag""" global _scraping_cancelled _scraping_cancelled = value def scraping_cancelled() -> bool: """Check if scraping has been cancelled""" return _scraping_cancelled def get_captcha_status(): """Get the current captcha status message""" global _captcha_status return _captcha_status def set_captcha_status(status: str): """Set the captcha status message""" global _captcha_status _captcha_status = status def clear_captcha_status(): """Clear the captcha status""" global _captcha_status _captcha_status = None async def force_close_browser(): """Force close browser and page instances""" global current_browser, current_page try: if current_page: await current_page.close() current_page = None if current_browser: await current_browser.close() current_browser = None except Exception as e: logger.error(f"Error closing browser: {str(e)}") def convert_to_absolute_url(href: str, base_url: str) -> str: """ Convert relative URL to absolute URL """ if href.startswith(('http://', 'https://')): return href return urljoin(base_url, href) def ensure_archive_directory(): """Ensure archive directory exists""" if not os.path.exists(ARCHIVE_DIR): os.makedirs(ARCHIVE_DIR) logger.info(f"📁 Created archive directory: {ARCHIVE_DIR}") async def scrape_news_async(url: str, website_type: str, custom_keywords: str = "", start_date: str = None, end_date: str = None, force_mode: str = None) -> List[dict]: """ Main entry point for scraping - delegates to appropriate scraper Args: url: URL to scrape website_type: Website type identifier custom_keywords: Custom keywords for filtering start_date: Optional start date for filtering end_date: Optional end date for filtering force_mode: Force scraper mode - "text" for text scraper, "document" for document scraper, None for auto-detect """ try: logger.info(f"🚀 Starting scraping for {website_type} at {url}") # Determine which scraper to use use_document_scraper = False if force_mode == "text": # Force text scraper use_document_scraper = False logger.info(f"📰 Forcing text scraper mode for {website_type}") elif force_mode == "document": # Force document scraper use_document_scraper = True logger.info(f"📄 Forcing document scraper mode for {website_type}") else: # Auto-detect based on config (backward compatible) pdf_websites = get_pdf_websites() use_document_scraper = website_type in pdf_websites if use_document_scraper: logger.info(f"📄 Auto-detected: Using document scraper for {website_type}") else: logger.info(f"📰 Auto-detected: Using text scraper for {website_type}") # Import the appropriate scraper if use_document_scraper: # Document-focused sites from document_scraper import extract_document_content_unified, download_all_pdfs_from_page else: # Text-focused sites from text_scraper import extract_article_content_unified, get_all_article_links_unified, extract_all_articles_unified # Get website configuration config = WEBSITE_CONFIG.get(website_type) if not config: logger.error(f"❌ No configuration found for website type: {website_type}") return [{ "title": "Configuration Error", "content": f"No configuration found for website type: {website_type}", "date": datetime.now().strftime("%Y-%m-%d"), "url": url }] # Initialize browser async with async_playwright() as p: # CHANGED: use hardened, headless launch to avoid X server errors browser = await p.chromium.launch(**PLAYWRIGHT_LAUNCH_KW) page = await browser.new_page() # Block ads, CSS, and images for better performance await page.route("**/*", lambda route: ( route.abort() if any(blocked in route.request.url.lower() for blocked in [ # Ad domains "googleads", "doubleclick", "googlesyndication", "google-analytics", "facebook.com/tr", "googletagmanager", "amazon-adsystem", "adsystem", "googletagservices", "ads.yahoo.com", "googletagservices", # CSS files ".css", "stylesheet", "font-awesome", "bootstrap.css", # Images ".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg", ".ico", "image/", "img/", "images/", "photos/", "pictures/", # Fonts ".woff", ".woff2", ".ttf", ".eot", "fonts/", "font/", # Videos and media ".mp4", ".avi", ".mov", ".wmv", ".flv", "video/", "media/", # Analytics and tracking "analytics", "tracking", "metrics", "stats", "telemetry" ]) else route.continue_() )) # Store browser instance for cancellation global current_browser, current_page current_browser = browser current_page = page try: # Navigate to the main page with retry logic (5 attempts) max_retries = 5 retry_count = 0 page_loaded = False while retry_count < max_retries and not page_loaded: try: retry_count += 1 logger.info(f"🔄 Loading website (attempt {retry_count}/{max_retries}): {url}") # Navigate with different strategies based on attempt if retry_count == 1: # First attempt: Use domcontentloaded for faster loading await page.goto(url, wait_until="domcontentloaded", timeout=30000) elif retry_count == 2: # Second attempt: Use basic loading await page.goto(url, timeout=20000) elif retry_count == 3: # Third attempt: Use networkidle await page.goto(url, wait_until="networkidle", timeout=15000) else: # Fourth and fifth attempts: Try with shorter timeouts await page.goto(url, timeout=10000) logger.info(f"✅ Successfully loaded website on attempt {retry_count}") page_loaded = True except Exception as e: logger.warning(f"⚠️ Attempt {retry_count} failed for {url}: {str(e)}") if retry_count >= max_retries: logger.error(f"❌ Failed to load website after {max_retries} attempts: {url}") return [{ "title": "WEBSITE_LOAD_ERROR", "content": f"Website is not working. Please try again later. Failed to access website after {max_retries} attempts: {str(e)}", "date": datetime.now().strftime("%Y-%m-%d"), "url": url }] # Wait before retry await asyncio.sleep(2) if not page_loaded: return [{ "title": "WEBSITE_LOAD_ERROR", "content": f"Website is not working. Please try again later. Failed to access website after {max_retries} attempts", "date": datetime.now().strftime("%Y-%m-%d"), "url": url }] # Check for captcha on initial page load if use_document_scraper: from document_scraper import check_and_wait_for_recaptcha captcha_result = await check_and_wait_for_recaptcha(page, config) if captcha_result == "CAPTCHA_TIMEOUT": logger.error("❌ Captcha detected but not solved within timeout period") return [{ "title": "CAPTCHA_ERROR", "content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.", "date": datetime.now().strftime("%Y-%m-%d"), "url": url }] # Delegate to appropriate scraper based on determined mode if use_document_scraper: # Document processing all_articles = await download_all_pdfs_from_page(page, url, config, website_type, start_date, end_date) else: # Text processing all_article_links = await get_all_article_links_unified(page, url, config, website_type) if not all_article_links: return [{ "title": "No articles found", "content": "No articles were found on the specified page", "date": datetime.now().strftime("%Y-%m-%d"), "url": url }] # Extract content from all articles all_articles = await extract_all_articles_unified(page, all_article_links, config, website_type, custom_keywords, start_date, end_date) return all_articles finally: # Clean up browser await browser.close() current_browser = None current_page = None except Exception as e: logger.error(f"❌ Error in main scraping function: {str(e)}") return [{ "title": "Scraping Error", "content": f"Error during scraping: {str(e)}", "date": datetime.now().strftime("%Y-%m-%d"), "url": url }]