#!/usr/bin/env python3 """ Uses unified pipeline for both text and document processing """ import gradio as gr import pandas as pd from datetime import datetime import os from typing import List, Dict, Any, Tuple, Optional import tempfile import logging import sys import subprocess import platform from unified_pipeline import process_text_content, process_document_content from scraper_common import scrape_news_async, set_scraping_cancelled, force_close_browser, scraping_cancelled from auth import auth_manager # --- Playwright bootstrap: install Chromium at runtime if missing --- import os, glob, subprocess, pathlib # Use the canonical path HF Spaces expect in root containers import os, glob, subprocess # Ensure path Playwright expects import os import subprocess # Make sure Playwright knows where to install browsers (HF standard) os.environ["PLAYWRIGHT_BROWSERS_PATH"] = "/root/.cache/ms-playwright" # Ensure Chromium is installed at runtime def ensure_chromium(): try: subprocess.run( ["playwright", "install", "--with-deps", "chromium"], check=True ) except Exception as e: print("Playwright install failed:", e) ensure_chromium() # Configure detailed logging for the app logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Global variables for cancellation document_processing_cancelled = False # Global variables for authentication current_user = None current_session = None def clear_memory_state(): """ Clear all memory state and global variables to free up memory This function should be called before starting new processing operations """ global document_processing_cancelled logger.info("๐Ÿงน Clearing memory state...") # Reset cancellation flags document_processing_cancelled = False set_scraping_cancelled(False) # Reset global PDF counter from scraper_common import reset_global_pdf_count reset_global_pdf_count() # Clear timeout URLs set from scraper_common import TIMEOUT_URLS TIMEOUT_URLS.clear() # Force close any open browser instances try: import asyncio import threading def close_browser_async(): try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(force_close_browser()) loop.close() except Exception as e: logger.debug(f"Browser already closed or error closing: {e}") # Close browser in background thread to avoid blocking browser_close_thread = threading.Thread(target=close_browser_async) browser_close_thread.start() except Exception as e: logger.debug(f"Error closing browser during memory clear: {e}") # Reset pipeline statistics if pipeline exists try: from unified_pipeline import get_pipeline pipeline = get_pipeline() if pipeline: pipeline.reset_stats() logger.debug("Pipeline statistics reset") except Exception as e: logger.debug(f"Error resetting pipeline stats: {e}") # Force garbage collection import gc gc.collect() logger.info("โœ… Memory state cleared successfully") # Authentication functions def login_user(username: str, password: str) -> Tuple[bool, str]: """Login user and return (success, message)""" global current_user, current_session success, session_token = auth_manager.authenticate_user(username, password) if success: current_user = username current_session = session_token return True, f"Welcome, {username}!" else: return False, "Invalid username or password" def logout_user() -> str: """Logout current user""" global current_user, current_session if current_session: auth_manager.logout_user(current_session) current_user = None current_session = None return "Logged out successfully" def is_authenticated() -> bool: """Check if user is authenticated""" global current_user, current_session if not current_user or not current_session: return False # Validate session valid, username = auth_manager.validate_session(current_session) if not valid: current_user = None current_session = None return False return True def get_current_user() -> Optional[str]: """Get current authenticated user""" if is_authenticated(): return current_user return None def require_auth(func): """Decorator to require authentication for functions""" def wrapper(*args, **kwargs): if not is_authenticated(): return None, "Please login to access this feature" return func(*args, **kwargs) return wrapper # Ensure archive directory exists def ensure_archive_directory(): """Ensure archive directory exists""" archive_dir = "archive" if not os.path.exists(archive_dir): os.makedirs(archive_dir) logger.info(f"๐Ÿ“ Created archive directory: {archive_dir}") return archive_dir def create_csv_download(df: pd.DataFrame, filename_prefix: str = "data") -> str: """ Create a CSV file from DataFrame and return the file path """ if df.empty: # Create empty CSV with headers empty_df = pd.DataFrame(columns=df.columns if not df.empty else ['#', 'title', 'content', 'summary', 'summary_somali', 'date', 'url']) csv_content = empty_df.to_csv(index=False) else: csv_content = df.to_csv(index=False) # Create temporary file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{filename_prefix}_{timestamp}.csv" with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, encoding='utf-8') as f: f.write(csv_content) temp_path = f.name return temp_path def save_csv_to_archive(df: pd.DataFrame, source: str, filename_prefix: str = "data") -> str: """ Save CSV file to archive folder organized by source + date """ # Create archive directory structure today = datetime.now().strftime("%Y-%m-%d") archive_dir = os.path.join("archive", source, today) os.makedirs(archive_dir, exist_ok=True) # Create filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{filename_prefix}_{timestamp}.csv" csv_path = os.path.join(archive_dir, filename) if df.empty: # Create empty CSV with headers empty_df = pd.DataFrame(columns=df.columns if not df.empty else ['#', 'title', 'content', 'summary', 'summary_somali', 'date', 'url']) empty_df.to_csv(csv_path, index=False) else: df.to_csv(csv_path, index=False) return csv_path def create_text_content_tab(): """ Create the text content tab interface """ with gr.Tab("Text Content"): gr.Markdown("## Website Content Scraper") gr.Markdown("Extract and analyze content from websites with AI-powered summarization.") with gr.Group(): gr.Markdown("### Configuration") with gr.Row(): url_input = gr.Textbox( label="Website URL", placeholder="https://example.com/article", interactive=True, scale=2 ) keywords_input = gr.Textbox( label="Filter Keywords (optional)", placeholder="e.g., flood, drought, conflict (comma-separated)", interactive=True, scale=2 ) with gr.Row(): start_date_input = gr.Textbox( label="Start Date (optional)", placeholder="YYYY-MM-DD (e.g., 2024-01-01)", interactive=True, scale=1, info="Filter articles from this date onwards" ) end_date_input = gr.Textbox( label="End Date (optional)", placeholder="YYYY-MM-DD (e.g., 2024-12-31)", interactive=True, scale=1, info="Filter articles up to this date" ) with gr.Row(): scrape_btn = gr.Button("Scrape Content", variant="primary") cancel_btn = gr.Button("Cancel", variant="stop", interactive=True, value="Cancel") clear_btn = gr.Button("Clear", variant="secondary") # Status text status_text = gr.Textbox( label="Status", value="Ready to scrape content...", interactive=False, visible=True ) # Display area for scraped content content_df = gr.Dataframe( label="Scraped Content", headers=["#", "Title", "Category", "Content", "Summary", "Summary (Somali)", "Date", "URL"], datatype=["str", "str", "str", "str", "str", "str", "str", "str"], interactive=True, wrap=True ) # Action buttons with gr.Row(): download_btn = gr.DownloadButton( label="๐Ÿ“ฅ Download CSV", variant="secondary", visible=False ) # Store full content data globally for modal access full_content_store = gr.State([]) def process_and_display(url, custom_keywords="", start_date="", end_date=""): """Process URL and display results with progress updates""" # Clear memory state before starting new processing clear_memory_state() # Clear captcha status from scraper_common import clear_captcha_status clear_captcha_status() logger.info(f"๐Ÿš€ Starting text content processing for URL: {url}") logger.info(f"๐Ÿ”‘ Custom keywords provided: {custom_keywords}") logger.debug(f"๐Ÿ“‹ Processing parameters: URL={url.strip()}") if not url.strip(): logger.warning("โš ๏ธ Empty URL provided") return pd.DataFrame(), None, "โŒ Error: Please enter a valid URL", [] try: import asyncio import threading import time # Detect website type from unified_pipeline import determine_website_type website_type = determine_website_type(url.strip()) # Check cancellation if scraping_cancelled(): logger.warning("โš ๏ธ Operation cancelled before starting") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user", [] # Step 1: Start scraping status_msg = f"๐Ÿ“ก Step 1/4: Starting content extraction from {website_type}..." yield pd.DataFrame(), None, status_msg, [] if scraping_cancelled(): logger.warning("โš ๏ธ Operation cancelled by user before content extraction") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user", [] # Create a result container and status tracker result_container = { 'df': None, 'full_content_data': None, 'error': None, 'completed': False, 'status': 'processing' } def run_async_processing(): """Run the async processing in a separate thread""" try: result_container['status'] = 'scraping' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) df, full_content_data = loop.run_until_complete(process_text_content(url.strip(), custom_keywords, start_date.strip() if start_date else None, end_date.strip() if end_date else None)) result_container['df'] = df result_container['full_content_data'] = full_content_data result_container['status'] = 'completed' result_container['completed'] = True except Exception as e: result_container['error'] = str(e) result_container['status'] = 'error' result_container['completed'] = True finally: loop.close() # Start processing in a separate thread processing_thread = threading.Thread(target=run_async_processing) processing_thread.start() # Monitor the processing and update status status_step = 1 last_status_time = time.time() while processing_thread.is_alive(): if scraping_cancelled(): logger.warning("โš ๏ธ Operation cancelled during processing") try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(force_close_browser()) loop.close() except Exception as e: logger.error(f"Error closing browser: {e}") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user", [] # Check for captcha status and update UI from scraper_common import get_captcha_status captcha_status = get_captcha_status() if captcha_status: yield pd.DataFrame(), None, captcha_status, [] time.sleep(0.5) # Check every 500ms continue # Update status periodically during processing current_time = time.time() if current_time - last_status_time >= 2.0: # Update every 2 seconds if status_step == 1: status_msg = "๐Ÿ”„ Step 2/4: Extracting content from website..." yield pd.DataFrame(), None, status_msg, [] status_step = 2 last_status_time = current_time elif status_step == 2: status_msg = "๐Ÿค– Step 3/4: Processing content with AI models..." yield pd.DataFrame(), None, status_msg, [] status_step = 3 last_status_time = current_time time.sleep(0.5) # Check every 500ms # Get the result if result_container['error']: logger.error(f"โŒ Error during processing: {result_container['error']}") return pd.DataFrame(), None, f"โŒ Error: {result_container['error']}", [] df = result_container['df'] full_content_data = result_container['full_content_data'] # Check cancellation after pipeline processing if scraping_cancelled(): logger.warning("โš ๏ธ Operation cancelled by user after content extraction") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user", [] # Step 4: Saving to archive num_articles = len(df) if df is not None and not df.empty else 0 status_msg = f"๐Ÿ’พ Step 4/4: Saving to archive... Found {num_articles} articles" yield pd.DataFrame(), None, status_msg, [] if scraping_cancelled(): logger.warning("โš ๏ธ Operation cancelled by user during archiving") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user", [] # Actually save to archive if not df.empty: try: source = url.split('/')[2].replace('www.', '') if '://' in url else 'unknown' archive_path = save_csv_to_archive(df, source, "scraped_content") logger.info(f"๐Ÿ“ Saved to archive: {archive_path}") except Exception as e: logger.error(f"โŒ Error saving to archive: {str(e)}") csv_file = create_csv_download(df, "scraped_content") if not df.empty else None # Final cancellation check if scraping_cancelled(): logger.warning("โš ๏ธ Operation cancelled by user before finalizing results") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user", [] # Processing complete logger.info(f"โœ… Processing complete! Found {len(df)} articles.") final_status = f"โœ… Processing complete! Found {len(df)} articles." yield df, csv_file, final_status, full_content_data except Exception as e: # Processing complete logger.error(f"โŒ Error during text content processing: {str(e)}") logger.debug(f"๐Ÿ” Error details: {type(e).__name__}: {str(e)}") return pd.DataFrame(), None, f"Error: {str(e)}", [] def cancel_scraping(): """Cancel the scraping operation""" logger.warning("โš ๏ธ User requested cancellation of scraping operation") # Clear memory state when cancelling clear_memory_state() logger.info("๐Ÿ›‘ Set cancellation flags") # Force close browser asynchronously in a separate thread to avoid blocking import threading def close_browser_async(): import asyncio try: logger.info("๐Ÿ”ง Attempting to close browser...") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(force_close_browser()) loop.close() logger.info("โœ… Browser closed successfully") except Exception as e: logger.error(f"โŒ Error closing browser: {e}") # Start browser closing in background browser_close_thread = threading.Thread(target=close_browser_async) browser_close_thread.start() return "๐Ÿ›‘ Cancellation requested - stopping operation..." def clear_all(): """Clear URL input, keywords input, date inputs, DataFrame, and download button""" logger.info("๐Ÿงน User requested to clear all data") # Clear memory state when manually clearing clear_memory_state() return "", "", "", "", pd.DataFrame(), None, "Ready to scrape content...", [] def update_download_visibility(df): return gr.DownloadButton(visible=not df.empty) scrape_btn.click( fn=process_and_display, inputs=[url_input, keywords_input, start_date_input, end_date_input], outputs=[content_df, download_btn, status_text, full_content_store], show_progress=True ) cancel_btn.click( fn=cancel_scraping, outputs=[status_text] ) clear_btn.click( fn=clear_all, outputs=[url_input, keywords_input, start_date_input, end_date_input, content_df, download_btn, status_text, full_content_store] ) content_df.change( fn=update_download_visibility, inputs=[content_df], outputs=[download_btn] ) def create_document_content_tab(): """ Create the document content tab interface """ with gr.Tab("Document Content"): gr.Markdown("## Document Content Processor") gr.Markdown("Extract and analyze content from PDF, DOC, and CSV documents with AI-powered processing.") with gr.Group(): gr.Markdown("### Document Source") with gr.Row(): doc_url_input = gr.Textbox( label="Document URL", placeholder="https://example.com/documents/", interactive=True, scale=2 ) with gr.Row(): doc_start_date_input = gr.Textbox( label="Start Date (optional)", placeholder="YYYY-MM-DD (e.g., 2024-01-01)", interactive=True, scale=1, info="Filter documents from this date onwards" ) doc_end_date_input = gr.Textbox( label="End Date (optional)", placeholder="YYYY-MM-DD (e.g., 2024-12-31)", interactive=True, scale=1, info="Filter documents up to this date" ) with gr.Row(): process_btn = gr.Button("Process Documents", variant="primary") doc_cancel_btn = gr.Button("Cancel", variant="stop", interactive=True, value="Cancel") doc_clear_btn = gr.Button("Clear", variant="secondary") # Status text for documents doc_status_text = gr.Textbox( label="Status", value="Ready to process documents...", interactive=False, visible=True ) # Display area for document content doc_df = gr.Dataframe( label="Document Content", headers=["Title", "Date", "Source", "File Path", "Extracted Text", "Summary", "Summary (Somali)", "File Type"], datatype=["str", "str", "str", "str", "str", "str", "str", "str"], interactive=True, wrap=True ) # Action buttons with gr.Row(): doc_download_btn = gr.DownloadButton( label="๐Ÿ“ฅ Download CSV", variant="secondary", visible=False ) def process_and_display_docs(url, start_date="", end_date=""): """Process documents and display results with progress updates""" # Clear memory state before starting new processing clear_memory_state() # Clear captcha status from scraper_common import clear_captcha_status clear_captcha_status() if not url.strip(): return pd.DataFrame(), None, "โŒ Error: Please enter a valid URL" try: import asyncio import threading import time # Detect website type from unified_pipeline import determine_website_type website_type = determine_website_type(url.strip()) # Check cancellation if document_processing_cancelled: return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user" # Step 1: Start document extraction status_msg = f"๐Ÿ“„ Step 1/4: Starting document extraction from {website_type}..." yield pd.DataFrame(), None, status_msg if document_processing_cancelled: return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user" # Create a result container and status tracker result_container = { 'df': None, 'error': None, 'completed': False, 'status': 'processing' } def run_async_processing(): """Run the async processing in a separate thread""" try: result_container['status'] = 'extracting' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) df = loop.run_until_complete(process_document_content(url.strip(), start_date.strip() if start_date else None, end_date.strip() if end_date else None)) result_container['df'] = df result_container['status'] = 'completed' result_container['completed'] = True except Exception as e: result_container['error'] = str(e) result_container['status'] = 'error' result_container['completed'] = True finally: loop.close() # Start processing in a separate thread processing_thread = threading.Thread(target=run_async_processing) processing_thread.start() # Monitor the processing and update status status_step = 1 last_status_time = time.time() while processing_thread.is_alive(): if document_processing_cancelled: logger.warning("โš ๏ธ Document processing cancelled during processing") try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(force_close_browser()) loop.close() except Exception as e: logger.error(f"Error closing browser: {e}") return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user" # Check for captcha status and update UI from scraper_common import get_captcha_status captcha_status = get_captcha_status() if captcha_status: yield pd.DataFrame(), None, captcha_status time.sleep(0.5) # Check every 500ms continue # Update status periodically during processing current_time = time.time() if current_time - last_status_time >= 2.0: # Update every 2 seconds if status_step == 1: status_msg = "๐Ÿ”„ Step 2/4: Extracting documents from website..." yield pd.DataFrame(), None, status_msg status_step = 2 last_status_time = current_time elif status_step == 2: status_msg = "๐Ÿค– Step 3/4: Processing documents with AI models..." yield pd.DataFrame(), None, status_msg status_step = 3 last_status_time = current_time time.sleep(0.5) # Check every 500ms # Get the result if result_container['error']: logger.error(f"โŒ Error during document processing: {result_container['error']}") return pd.DataFrame(), None, f"โŒ Error: {result_container['error']}" df = result_container['df'] # Check cancellation after pipeline processing if document_processing_cancelled: return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user" # Step 4: Saving to archive num_docs = len(df) if df is not None and not df.empty else 0 status_msg = f"๐Ÿ’พ Step 4/4: Saving to archive... Found {num_docs} documents" yield pd.DataFrame(), None, status_msg if document_processing_cancelled: return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user" # Actually save to archive if not df.empty: try: source = url.split('/')[2].replace('www.', '') if '://' in url else 'unknown' archive_path = save_csv_to_archive(df, source, "document_content") logger.info(f"๐Ÿ“ Saved to archive: {archive_path}") except Exception as e: logger.error(f"โŒ Error saving to archive: {str(e)}") csv_file = create_csv_download(df, "document_content") if not df.empty else None # Final cancellation check if document_processing_cancelled: return pd.DataFrame(), None, "๐Ÿ›‘ Operation cancelled by user" # Processing complete logger.info(f"โœ… Document processing complete! Found {len(df)} documents.") final_status = f"โœ… Processing complete! Found {len(df)} documents." yield df, csv_file, final_status except Exception as e: # Processing complete return pd.DataFrame(), None, f"Error: {str(e)}" def cancel_document_processing(): """Cancel the document processing operation""" logger.warning("โš ๏ธ User requested cancellation of document processing") # Clear memory state when cancelling clear_memory_state() # Force close browser asynchronously in a separate thread to avoid blocking import threading def close_browser_async(): import asyncio try: logger.info("๐Ÿ”ง Attempting to close browser...") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(force_close_browser()) loop.close() logger.info("โœ… Browser closed successfully") except Exception as e: logger.error(f"โŒ Error closing browser: {e}") # Start browser closing in background browser_close_thread = threading.Thread(target=close_browser_async) browser_close_thread.start() return "๐Ÿ›‘ Document processing cancelled - stopping operation..." def clear_doc_all(): """Clear URL input, date inputs, DataFrame, and download button for document content""" # Clear memory state when manually clearing clear_memory_state() return "", "", "", pd.DataFrame(), None, "Ready to process documents..." process_btn.click( fn=process_and_display_docs, inputs=[doc_url_input, doc_start_date_input, doc_end_date_input], outputs=[doc_df, doc_download_btn, doc_status_text], show_progress=True ) doc_cancel_btn.click( fn=cancel_document_processing, outputs=[doc_status_text] ) doc_clear_btn.click( fn=clear_doc_all, outputs=[doc_url_input, doc_start_date_input, doc_end_date_input, doc_df, doc_download_btn, doc_status_text] ) doc_df.change( fn=lambda df: gr.DownloadButton(visible=not df.empty), inputs=[doc_df], outputs=[doc_download_btn] ) def create_archive_tab(): """ Create the archive access tab interface """ with gr.Tab("Archive Access"): gr.Markdown("## Archived Files Access") gr.Markdown("Browse, download, and manage previously processed files from the archive.") # File Management Section with gr.Row(): # CSV Files Column with gr.Column(scale=1, elem_classes="admin-section"): gr.Markdown("### CSV Files") gr.Markdown("*Processed data files*") with gr.Row(): refresh_csv_btn = gr.Button("Refresh CSV", variant="secondary", size="sm") gr.Markdown("*Update CSV file list*") csv_df = gr.Dataframe( label="", headers=["Source", "Date", "Filename", "Path"], datatype=["str", "str", "str", "str"], interactive=True, wrap=False, elem_id="csv_dataframe" ) csv_selection = gr.Dropdown( label="Select CSV File", choices=[], value=None, interactive=True ) # CSV Action buttons with gr.Row(): open_csv_btn = gr.Button("Open", variant="secondary", size="sm") delete_csv_btn = gr.Button("Delete", variant="stop", size="sm") open_csv_folder_btn = gr.Button("Folder", variant="secondary", size="sm") # PDF Files Column with gr.Column(scale=1, elem_classes="admin-section"): gr.Markdown("### PDF Files") gr.Markdown("*Downloaded documents*") with gr.Row(): refresh_pdf_btn = gr.Button("Refresh PDF", variant="secondary", size="sm") gr.Markdown("*Update PDF file list*") pdf_df = gr.Dataframe( label="", headers=["Source", "Date", "Filename", "Size", "Path"], datatype=["str", "str", "str", "str", "str"], interactive=True, wrap=False, elem_id="pdf_dataframe" ) pdf_selection = gr.Dropdown( label="Select PDF File", choices=[], value=None, interactive=True ) # PDF Action buttons with gr.Row(): open_pdf_btn = gr.Button("Open", variant="secondary", size="sm") delete_pdf_btn = gr.Button("Delete", variant="stop", size="sm") open_pdf_folder_btn = gr.Button("Folder", variant="secondary", size="sm") # Status section with gr.Column(elem_classes="admin-section"): status_text = gr.Textbox( label="Status", interactive=False, value="Ready to access archived files...", lines=2 ) # Archive functions def get_archived_csv_files(): """Get list of archived CSV files""" archive_dir = ensure_archive_directory() csv_files = [] if os.path.exists(archive_dir): for source in os.listdir(archive_dir): source_path = os.path.join(archive_dir, source) if os.path.isdir(source_path): for date in os.listdir(source_path): date_path = os.path.join(source_path, date) if os.path.isdir(date_path): for file in os.listdir(date_path): if file.endswith('.csv'): file_path = os.path.join(date_path, file) file_size = os.path.getsize(file_path) csv_files.append({ 'source': source, 'date': date, 'filename': file, 'path': file_path, 'size': f"{file_size / 1024:.2f} KB" }) return sorted(csv_files, key=lambda x: (x['source'], x['date'], x['filename']), reverse=True) def get_archived_pdf_files(): """Get list of archived PDF files""" archive_dir = ensure_archive_directory() pdf_files = [] if os.path.exists(archive_dir): for source in os.listdir(archive_dir): source_path = os.path.join(archive_dir, source) if os.path.isdir(source_path): for date in os.listdir(source_path): date_path = os.path.join(source_path, date) if os.path.isdir(date_path): # Check main date folder for file in os.listdir(date_path): if file.endswith('.pdf'): file_path = os.path.join(date_path, file) file_size = os.path.getsize(file_path) pdf_files.append({ 'source': source, 'date': date, 'filename': file, 'path': file_path, 'size': f"{file_size / 1024 / 1024:.2f} MB" }) # Check pdf subfolder pdf_folder = os.path.join(date_path, "pdf") if os.path.exists(pdf_folder): for file in os.listdir(pdf_folder): if file.endswith('.pdf'): file_path = os.path.join(pdf_folder, file) file_size = os.path.getsize(file_path) pdf_files.append({ 'source': source, 'date': date, 'filename': file, 'path': file_path, 'size': f"{file_size / 1024 / 1024:.2f} MB" }) return sorted(pdf_files, key=lambda x: (x['source'], x['date'], x['filename']), reverse=True) def refresh_csv_files(): """Refresh CSV files list""" csv_files = get_archived_csv_files() if csv_files: display_data = [ { 'Source': item['source'], 'Date': item['date'], 'Filename': item['filename'], 'Path': item['path'] } for item in csv_files ] df = pd.DataFrame(display_data) choices = [f"{item['source']} | {item['date']} | {item['filename']}" for item in csv_files] default_choice = choices[0] if choices else None return df, f"Found {len(csv_files)} CSV files. Select a file below and click 'Open Selected CSV'.", gr.update(choices=choices, value=default_choice) else: return pd.DataFrame(), "No CSV files found in the archive.", gr.update(choices=[], value=None) def refresh_pdf_files(): """Refresh PDF files list""" pdf_files = get_archived_pdf_files() if pdf_files: display_data = [ { 'Source': item['source'], 'Date': item['date'], 'Filename': item['filename'], 'Size': item['size'], 'Path': item['path'] } for item in pdf_files ] df = pd.DataFrame(display_data) choices = [f"{item['source']} | {item['date']} | {item['filename']}" for item in pdf_files] default_choice = choices[0] if choices else None return df, f"Found {len(pdf_files)} PDF files. Select a file below and click 'Open Selected PDF'.", gr.update(choices=choices, value=default_choice) else: return pd.DataFrame(), "No PDF files found in the archive.", gr.update(choices=[], value=None) def open_selected_csv(selected_option): """Open the selected CSV file""" try: if not selected_option: return "Please choose a CSV from the dropdown before clicking 'Open'." try: source, date, filename = [part.strip() for part in selected_option.split("|")] except ValueError: return "Invalid selection format. Please refresh the list and try again." for item in get_archived_csv_files(): if item['source'] == source and item['date'] == date and item['filename'] == filename: file_path = item['path'] if os.path.exists(file_path): return open_csv_file(file_path) return f"Cannot open file: {file_path}. File does not exist." return "Selected file not found. Please refresh the list." except Exception as e: return f"Error opening CSV file: {str(e)}" def open_selected_pdf(selected_option): """Open the selected PDF file""" try: if not selected_option: return "Please choose a PDF from the dropdown before clicking 'Open'." try: source, date, filename = [part.strip() for part in selected_option.split("|")] except ValueError: return "Invalid selection format. Please refresh the list and try again." for item in get_archived_pdf_files(): if item['source'] == source and item['date'] == date and item['filename'] == filename: file_path = item['path'] if os.path.exists(file_path): return open_pdf_file(file_path) return f"Cannot open file: {file_path}. File does not exist." return "Selected file not found. Please refresh the list." except Exception as e: return f"Error opening PDF file: {str(e)}" def open_csv_file(file_path: str): """Open a CSV file with the default application""" try: abs_path = os.path.abspath(file_path) # Open file based on operating system if platform.system() == "Windows": subprocess.run(["start", "", abs_path], check=True, shell=True) elif platform.system() == "Darwin": # macOS subprocess.run(["open", abs_path], check=True) else: # Linux subprocess.run(["xdg-open", abs_path], check=True) return f"Opened CSV file: {abs_path}" except Exception as e: return f"Error opening CSV file: {str(e)}" def open_pdf_file(file_path: str): """Open a PDF file with the default application""" try: abs_path = os.path.abspath(file_path) # Open file based on operating system if platform.system() == "Windows": subprocess.run(["start", "", abs_path], check=True, shell=True) elif platform.system() == "Darwin": # macOS subprocess.run(["open", abs_path], check=True) else: # Linux subprocess.run(["xdg-open", abs_path], check=True) return f"Opened PDF file: {abs_path}" except Exception as e: return f"Error opening PDF file: {str(e)}" def delete_selected_csv(selected_option): """Delete the selected CSV file""" try: if not selected_option: return "Please choose a CSV from the dropdown before clicking 'Delete'." try: source, date, filename = [part.strip() for part in selected_option.split("|")] except ValueError: return "Invalid selection format. Please refresh the list and try again." for item in get_archived_csv_files(): if item['source'] == source and item['date'] == date and item['filename'] == filename: file_path = item['path'] if os.path.exists(file_path): os.remove(file_path) return f"Successfully deleted CSV file: {filename}" return f"Cannot delete file: {file_path}. File does not exist." return "Selected file not found. Please refresh the list." except Exception as e: return f"Error deleting CSV file: {str(e)}" def delete_selected_pdf(selected_option): """Delete the selected PDF file""" try: if not selected_option: return "Please choose a PDF from the dropdown before clicking 'Delete'." try: source, date, filename = [part.strip() for part in selected_option.split("|")] except ValueError: return "Invalid selection format. Please refresh the list and try again." for item in get_archived_pdf_files(): if item['source'] == source and item['date'] == date and item['filename'] == filename: file_path = item['path'] if os.path.exists(file_path): os.remove(file_path) return f"Successfully deleted PDF file: {filename}" return f"Cannot delete file: {file_path}. File does not exist." return "Selected file not found. Please refresh the list." except Exception as e: return f"Error deleting PDF file: {str(e)}" def open_csv_folder(): """Open the CSV archive folder""" archive_dir = os.path.abspath("archive") try: if platform.system() == "Windows": subprocess.run(["explorer", archive_dir], check=True) elif platform.system() == "Darwin": # macOS subprocess.run(["open", archive_dir], check=True) else: # Linux subprocess.run(["xdg-open", archive_dir], check=True) return f"Opened archive folder: {archive_dir}" except Exception as e: return f"Error opening folder: {str(e)}" def open_pdf_folder(): """Open the PDF archive folder""" archive_dir = os.path.abspath("archive") try: if platform.system() == "Windows": subprocess.run(["explorer", archive_dir], check=True) elif platform.system() == "Darwin": # macOS subprocess.run(["open", archive_dir], check=True) else: # Linux subprocess.run(["xdg-open", archive_dir], check=True) return f"Opened archive folder: {archive_dir}" except Exception as e: return f"Error opening folder: {str(e)}" refresh_csv_btn.click( fn=refresh_csv_files, outputs=[csv_df, status_text, csv_selection] ) refresh_pdf_btn.click( fn=refresh_pdf_files, outputs=[pdf_df, status_text, pdf_selection] ) open_csv_btn.click( fn=open_selected_csv, inputs=[csv_selection], outputs=[status_text] ) open_pdf_btn.click( fn=open_selected_pdf, inputs=[pdf_selection], outputs=[status_text] ) delete_csv_btn.click( fn=delete_selected_csv, inputs=[csv_selection], outputs=[status_text] ) delete_pdf_btn.click( fn=delete_selected_pdf, inputs=[pdf_selection], outputs=[status_text] ) open_csv_folder_btn.click( fn=open_csv_folder, outputs=[status_text] ) open_pdf_folder_btn.click( fn=open_pdf_folder, outputs=[status_text] ) def create_keywords_management_tab(): """ Create the keywords management tab interface """ with gr.Tab("Keywords Management"): # Header section gr.Markdown("## Keywords Configuration") gr.Markdown("Manage keyword categories for intelligent article filtering and categorization.") # Load current keywords configuration def load_keywords_config(): """Load current keywords configuration""" try: from keyword_filter import load_keywords_config categories = load_keywords_config() return categories if categories else {} except Exception as e: logger.error(f"Error loading keywords config: {str(e)}") return {} def get_category_list(): """Get list of categories for dropdown""" categories = load_keywords_config() return list(categories.keys()) if categories else [] def get_keywords_for_category(category): """Get keywords for a specific category""" categories = load_keywords_config() if category and category in categories: return ", ".join(categories[category]) return "" def add_new_category(category_name, keywords_text): """Add a new category with keywords""" try: from keyword_filter import load_keywords_config, save_keywords_config if not category_name.strip(): return "โŒ Category name cannot be empty", gr.update(), gr.update() # Load current config categories = load_keywords_config() if not categories: categories = {} # Parse keywords keywords = [kw.strip() for kw in keywords_text.split(",") if kw.strip()] if not keywords: return "โŒ Please provide at least one keyword", gr.update(), gr.update() # Add new category categories[category_name.strip()] = keywords # Save configuration config_data = {"categories": categories} success, message = save_keywords_config(config_data) if success: return f"โœ… {message}", gr.update(choices=get_category_list(), value=category_name.strip()), gr.update() else: return f"โŒ {message}", gr.update(), gr.update() except Exception as e: logger.error(f"Error adding category: {str(e)}") return f"โŒ Error adding category: {str(e)}", gr.update(), gr.update() def update_category_keywords(category, keywords_text): """Update keywords for a category""" try: from keyword_filter import load_keywords_config, save_keywords_config if not category: return "โŒ Please select a category", gr.update() # Load current config categories = load_keywords_config() if not categories: return "โŒ No categories found", gr.update() # Parse keywords keywords = [kw.strip() for kw in keywords_text.split(",") if kw.strip()] if not keywords: return "โŒ Please provide at least one keyword", gr.update() # Update category categories[category] = keywords # Save configuration config_data = {"categories": categories} success, message = save_keywords_config(config_data) if success: return f"โœ… {message}" else: return f"โŒ {message}" except Exception as e: logger.error(f"Error updating category: {str(e)}") return f"โŒ Error updating category: {str(e)}" def delete_category(category): """Delete a category""" try: from keyword_filter import load_keywords_config, save_keywords_config if not category: return "โŒ Please select a category to delete", gr.update(), gr.update() # Load current config categories = load_keywords_config() if not categories: return "โŒ No categories found", gr.update(), gr.update() # Remove category if category in categories: del categories[category] # Save configuration config_data = {"categories": categories} success, message = save_keywords_config(config_data) if success: new_choices = get_category_list() return f"โœ… Category '{category}' deleted successfully", gr.update(choices=new_choices, value=None), gr.update() else: return f"โŒ {message}", gr.update(), gr.update() else: return f"โŒ Category '{category}' not found", gr.update(), gr.update() except Exception as e: logger.error(f"Error deleting category: {str(e)}") return f"โŒ Error deleting category: {str(e)}", gr.update(), gr.update() # Initialize with current categories initial_categories = get_category_list() # Create two-column layout with gr.Row(): # Left column - Add new category with gr.Column(scale=1): with gr.Group(): gr.Markdown("### Add New Category") gr.Markdown("*Create a new keyword category for article filtering*") new_category_name = gr.Textbox( label="Category Name", placeholder="e.g., Health / Epidemics", interactive=True, info="Enter a descriptive name for the category" ) new_category_keywords = gr.Textbox( label="Keywords (comma-separated)", placeholder="e.g., cholera, malaria, covid, outbreak", lines=4, interactive=True, info="Enter keywords separated by commas." ) add_category_btn = gr.Button("Add Category", variant="primary", size="lg") # Right column - Edit existing category with gr.Column(scale=1): with gr.Group(): gr.Markdown("### Edit Existing Category") gr.Markdown("*Modify or delete existing keyword categories*") category_dropdown = gr.Dropdown( label="Select Category", choices=initial_categories, interactive=True, value=initial_categories[0] if initial_categories else None, info="Choose a category to edit or delete" ) category_keywords = gr.Textbox( label="Keywords (comma-separated)", placeholder="Enter keywords separated by commas", lines=4, interactive=True, info="Edit the keywords for the selected category" ) with gr.Row(): update_btn = gr.Button("Update Keywords", variant="primary") delete_btn = gr.Button("Delete Category", variant="stop") # Status section gr.Markdown("---") status_display = gr.Textbox( label="Status", value="Ready to manage keywords...", interactive=False, visible=True, info="Status messages will appear here" ) # Event handlers add_category_btn.click( fn=add_new_category, inputs=[new_category_name, new_category_keywords], outputs=[status_display, category_dropdown, category_keywords] ) category_dropdown.change( fn=get_keywords_for_category, inputs=[category_dropdown], outputs=[category_keywords] ) update_btn.click( fn=update_category_keywords, inputs=[category_dropdown, category_keywords], outputs=[status_display] ) delete_btn.click( fn=delete_category, inputs=[category_dropdown], outputs=[status_display, category_dropdown, category_keywords] ) def create_admin_tab(): """ Create the admin panel tab interface """ with gr.Tab("Admin Panel") as admin_tab: gr.Markdown("## Admin Panel") gr.Markdown("Manage user accounts, permissions, and system settings.") # Create two main columns for better organization with gr.Row(): # Left column - User Management (only visible to admins) with gr.Column(scale=1, elem_classes="admin-section", visible=True) as admin_user_section: with gr.Group(elem_classes="admin-group"): gr.Markdown("### Add New User") gr.Markdown("*Create new user accounts*") new_username = gr.Textbox( label="Username", placeholder="Enter username", interactive=True ) new_password = gr.Textbox( label="Password", placeholder="Enter password", type="password", interactive=True ) is_admin = gr.Checkbox( label="Grant admin privileges", value=False, interactive=True ) add_user_btn = gr.Button("Add User", variant="primary", size="sm", elem_classes="admin-button") # Right column - Password Management with gr.Column(scale=1, elem_classes="admin-section"): with gr.Group(elem_classes="admin-group"): gr.Markdown("### Change Password") gr.Markdown("*Update your account password*") change_old_password = gr.Textbox( label="Current Password", placeholder="Enter current password", type="password", interactive=True ) change_new_password = gr.Textbox( label="New Password", placeholder="Enter new password", type="password", interactive=True ) change_password_btn = gr.Button("Change Password", variant="secondary", size="sm", elem_classes="admin-button") # User List Section - Full width with gr.Column(elem_classes="admin-section"): with gr.Group(elem_classes="admin-group"): gr.Markdown("### System Users") gr.Markdown("*View all registered users*") with gr.Row(): refresh_users_btn = gr.Button("Refresh", variant="secondary", size="sm", elem_classes="admin-button") users_df = gr.Dataframe( label="", headers=["Username", "Admin", "Created", "Last Login"], datatype=["str", "str", "str", "str"], interactive=False, wrap=True ) # Status messages - Compact admin_status = gr.Textbox( label="Status", value="Ready - Use the controls above to manage users", interactive=False, lines=2 ) def handle_add_user(username, password, admin_check): """Handle adding new user (admin only)""" if not is_authenticated() or not auth_manager.is_admin(get_current_user()): return "โŒ Access denied - Admin privileges required", pd.DataFrame(), gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=False) if not username or not password: return "โŒ Please enter both username and password", pd.DataFrame(), gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=True) success = auth_manager.add_user(username, password, admin_check) if success: # Refresh user list users = auth_manager.list_users() user_data = [] for user, info in users.items(): user_data.append({ 'Username': user, 'Admin': 'Yes' if info.get('is_admin', False) else 'No', 'Created': info.get('created_at', 'Unknown'), 'Last Login': info.get('last_login', 'Never') }) df = pd.DataFrame(user_data) return f"โœ… User '{username}' added successfully", df, gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=True) else: return f"โŒ Failed to add user '{username}' (user may already exist)", pd.DataFrame(), gr.update(value=""), gr.update(value=""), gr.update(value=False), gr.update(visible=True) def handle_change_password(old_password, new_password): """Handle password change""" if not is_authenticated(): return "โŒ Please login first", gr.update(value=""), gr.update(value="") if not old_password or not new_password: return "โŒ Please enter both current and new password", gr.update(value=""), gr.update(value="") success = auth_manager.change_password(get_current_user(), old_password, new_password) if success: return "โœ… Password changed successfully", gr.update(value=""), gr.update(value="") else: return "โŒ Failed to change password (check current password)", gr.update(value=""), gr.update(value="") def refresh_users(): """Refresh the user list""" if not is_authenticated() or not auth_manager.is_admin(get_current_user()): return pd.DataFrame(), "โŒ Access denied - Admin privileges required" users = auth_manager.list_users() user_data = [] for user, info in users.items(): user_data.append({ 'Username': user, 'Admin': 'Yes' if info.get('is_admin', False) else 'No', 'Created': info.get('created_at', 'Unknown'), 'Last Login': info.get('last_login', 'Never') }) df = pd.DataFrame(user_data) return df, f"โœ… User list refreshed - {len(users)} users found" def refresh_users_and_check_admin(): """Refresh users and check admin status""" # Check admin status is_admin = is_authenticated() and auth_manager.is_admin(get_current_user()) # Get users if admin if is_admin: users = auth_manager.list_users() user_data = [] for user, info in users.items(): user_data.append({ 'Username': user, 'Admin': 'Yes' if info.get('is_admin', False) else 'No', 'Created': info.get('created_at', 'Unknown'), 'Last Login': info.get('last_login', 'Never') }) df = pd.DataFrame(user_data) return df, f"โœ… User list refreshed - {len(users)} users found", gr.update(visible=True) else: return pd.DataFrame(), "โŒ Access denied - Admin privileges required", gr.update(visible=False) def check_admin_status(): """Check if current user is admin and show/hide admin user section""" if is_authenticated() and auth_manager.is_admin(get_current_user()): return gr.update(visible=True) else: return gr.update(visible=False) def initialize_admin_panel(): """Initialize admin panel when tab loads""" if is_authenticated() and auth_manager.is_admin(get_current_user()): # Get users list for admin users = auth_manager.list_users() user_data = [] for user, info in users.items(): user_data.append({ 'Username': user, 'Admin': 'Yes' if info.get('is_admin', False) else 'No', 'Created': info.get('created_at', 'Unknown'), 'Last Login': info.get('last_login', 'Never') }) df = pd.DataFrame(user_data) return df, f"โœ… Admin panel loaded - {len(users)} users found", gr.update(visible=True) else: return pd.DataFrame(), "โŒ Access denied - Admin privileges required", gr.update(visible=False) # Event handlers add_user_btn.click( fn=handle_add_user, inputs=[new_username, new_password, is_admin], outputs=[admin_status, users_df, new_username, new_password, is_admin, admin_user_section] ) change_password_btn.click( fn=handle_change_password, inputs=[change_old_password, change_new_password], outputs=[admin_status, change_old_password, change_new_password] ) refresh_users_btn.click( fn=refresh_users_and_check_admin, outputs=[users_df, admin_status, admin_user_section] ) # Initialize admin panel when tab loads admin_tab.select( fn=initialize_admin_panel, outputs=[users_df, admin_status, admin_user_section] ) def create_website_config_tab(): """ Create the website configuration management tab interface """ with gr.Tab("Website Config"): # Header section gr.Markdown("## Website Configuration Management") gr.Markdown("Configure and manage CSS selectors for website scraping. Customize how content is extracted.") # Load current website configuration def load_website_config(): """Load current website configuration""" try: from scraper_common import load_website_config config = load_website_config() return config if config else {} except Exception as e: logger.error(f"Error loading website config: {str(e)}") return {} def get_website_list(): """Get list of website types for dropdown""" config = load_website_config() return list(config.keys()) if config else [] def get_config_for_website(website_type, current_state=None): """Get configuration for a specific website""" # Use state if available, otherwise load from file config = current_state if current_state else load_website_config() if website_type and website_type in config: website_config = config[website_type] # Convert arrays to comma-separated strings for display pdf_links = website_config.get('pdf_links', []) if isinstance(pdf_links, list): pdf_links_str = ", ".join(pdf_links) if pdf_links else "" else: pdf_links_str = str(pdf_links) if pdf_links else "" file_links = website_config.get('file_links', []) if isinstance(file_links, list): file_links_str = ", ".join(file_links) if file_links else "" else: file_links_str = str(file_links) if file_links else "" # Handle content field (can be string or array) content = website_config.get('content', '') if isinstance(content, list): content_str = ", ".join(content) if content else "" else: content_str = str(content) if content else "" return ( website_config.get('base_url', '') or '', website_config.get('article_links', '') or '', website_config.get('page_links', '') or '', website_config.get('title', '') or '', content_str, website_config.get('date', '') or '', website_config.get('navigation_selector', '') or '', website_config.get('navigation_url_addition', '') or '', str(website_config.get('start_page', 0)) if website_config.get('start_page') is not None else '0', pdf_links_str, file_links_str, website_config.get('recaptcha_text', '') or '' ) return ('', '', '', '', '', '', '', '', '0', '', '', '') # Initialize with current websites initial_websites = get_website_list() # Create layout with three sections with gr.Row(): # Left column - Website selection and actions with gr.Column(scale=1): with gr.Group(): gr.Markdown("### Select Website") gr.Markdown("*Choose a website to edit or delete*") website_dropdown = gr.Dropdown( label="Website Type", choices=initial_websites, interactive=True, value=initial_websites[0] if initial_websites else None, info="Select a website configuration to edit" ) with gr.Row(): delete_website_btn = gr.Button("Delete Website", variant="stop") with gr.Group(): gr.Markdown("### Add New Website") gr.Markdown("*Create a new website configuration*") new_website_type = gr.Textbox( label="Website Type Name", placeholder="e.g., newsite", interactive=True, info="Enter a unique identifier (no spaces)" ) add_website_btn = gr.Button("Add New Website", variant="primary") # Right column - Configuration form with gr.Column(scale=2): gr.Markdown("### Configuration Fields") gr.Markdown("*Edit the configuration fields below*") # Required fields with gr.Group(): gr.Markdown("**Required Fields**") base_url_field = gr.Textbox( label="Base URL", placeholder="e.g., https://example.com", interactive=True, info="Base URL of the website (required)" ) title_field = gr.Textbox( label="Title Selector", placeholder="e.g., h1, .title, #article-title", interactive=True, info="CSS selector for article title (required)" ) content_field = gr.Textbox( label="Content Selector", placeholder="e.g., .content, p, #main-body", interactive=True, lines=2, info="CSS selector for article content (required). For multiple selectors, use comma-separated values." ) # Optional fields with gr.Group(): gr.Markdown("**Optional Fields**") article_links_field = gr.Textbox( label="Article Links Selector", placeholder="e.g., .article-link a, h2 a", interactive=True, info="CSS selector for article links on listing pages" ) page_links_field = gr.Textbox( label="Page Links Selector", placeholder="e.g., .page-link a", interactive=True, info="CSS selector for page links (for document sites)" ) date_field = gr.Textbox( label="Date Selector", placeholder="e.g., .date, time, .published", interactive=True, info="CSS selector for publication date" ) navigation_selector_field = gr.Textbox( label="Navigation Selector", placeholder="e.g., .pagination, .nav-links", interactive=True, info="CSS selector for pagination navigation" ) navigation_url_addition_field = gr.Textbox( label="Navigation URL Addition", placeholder="e.g., ?page={page_no}, /page/{page_no}/", interactive=True, info="URL pattern for pagination (use {page_no} as placeholder)" ) start_page_field = gr.Textbox( label="Start Page", placeholder="0 or 1", interactive=True, value="0", info="Starting page number (0 or 1)" ) pdf_links_field = gr.Textbox( label="PDF Links Selectors", placeholder="e.g., a[href$='.pdf'], .pdf-link", interactive=True, lines=2, info="CSS selectors for PDF links (comma-separated for multiple)" ) file_links_field = gr.Textbox( label="File Links Selectors", placeholder="e.g., a[href$='.csv'], .file-link", interactive=True, lines=2, info="CSS selectors for file links (comma-separated for multiple)" ) recaptcha_text_field = gr.Textbox( label="Recaptcha Text", placeholder="e.g., Let's confirm you are human", interactive=True, info="Text to look for when recaptcha is present" ) with gr.Row(): update_website_btn = gr.Button("Update Website", variant="primary") save_all_btn = gr.Button("Save All Changes", variant="primary") cancel_btn = gr.Button("Cancel", variant="secondary") # State to track unsaved changes unsaved_config_state = gr.State(value={}) def add_new_website(website_type, base_url, article_links, page_links, title, content, date, navigation_selector, navigation_url_addition, start_page, pdf_links, file_links, recaptcha_text, current_state): """Add a new website configuration""" try: if not website_type or not website_type.strip(): return gr.update(), gr.update() website_type = website_type.strip() # Validate website type name if ' ' in website_type: return gr.update(), gr.update() # Load current config or use state config = current_state if current_state else load_website_config() if not config: config = {} # Check if website already exists if website_type in config: return gr.update(), gr.update() # Validate required fields if not title and not content: return gr.update(), gr.update() if not base_url or not base_url.strip(): return gr.update(), gr.update() # Build config object new_config = {} # Add base_url (required) new_config['base_url'] = base_url.strip() # Add fields if provided if article_links.strip(): new_config['article_links'] = article_links.strip() if page_links.strip(): new_config['page_links'] = page_links.strip() if title.strip(): new_config['title'] = title.strip() if content.strip(): # Check if content is comma-separated (multiple selectors) content_vals = [c.strip() for c in content.split(',') if c.strip()] if len(content_vals) > 1: new_config['content'] = content_vals else: new_config['content'] = content.strip() if date.strip(): new_config['date'] = date.strip() if navigation_selector.strip(): new_config['navigation_selector'] = navigation_selector.strip() else: new_config['navigation_selector'] = None if navigation_url_addition.strip(): new_config['navigation_url_addition'] = navigation_url_addition.strip() else: new_config['navigation_url_addition'] = None if start_page.strip(): try: new_config['start_page'] = int(start_page.strip()) except ValueError: return gr.update(), gr.update() else: new_config['start_page'] = 0 # Handle array fields if pdf_links.strip(): pdf_list = [p.strip() for p in pdf_links.split(',') if p.strip()] new_config['pdf_links'] = pdf_list if file_links.strip(): file_list = [f.strip() for f in file_links.split(',') if f.strip()] new_config['file_links'] = file_list if recaptcha_text.strip(): new_config['recaptcha_text'] = recaptcha_text.strip() # Add to config config[website_type] = new_config # Store in state (not saved yet) website_list = list(config.keys()) return (gr.update(choices=website_list, value=website_type), config) except Exception as e: logger.error(f"Error adding website: {str(e)}") return gr.update(), gr.update() def update_website(website_type, base_url, article_links, page_links, title, content, date, navigation_selector, navigation_url_addition, start_page, pdf_links, file_links, recaptcha_text, current_state): """Update an existing website configuration""" try: if not website_type: return gr.update() # Load current config or use state config = current_state if current_state else load_website_config() if not config: config = {} if website_type not in config: return gr.update() # Validate required fields if not title and not content: return gr.update() if not base_url or not base_url.strip(): return gr.update() # Start with existing config to preserve fields existing_config = config.get(website_type, {}) updated_config = existing_config.copy() # Update base_url (required) updated_config['base_url'] = base_url.strip() # Update fields if provided if article_links.strip(): updated_config['article_links'] = article_links.strip() elif 'article_links' in updated_config: del updated_config['article_links'] if page_links.strip(): updated_config['page_links'] = page_links.strip() elif 'page_links' in updated_config: del updated_config['page_links'] if title.strip(): updated_config['title'] = title.strip() if content.strip(): # Check if content is comma-separated (multiple selectors) content_vals = [c.strip() for c in content.split(',') if c.strip()] if len(content_vals) > 1: updated_config['content'] = content_vals else: updated_config['content'] = content.strip() if date.strip(): updated_config['date'] = date.strip() elif 'date' in updated_config: del updated_config['date'] if navigation_selector.strip(): updated_config['navigation_selector'] = navigation_selector.strip() else: updated_config['navigation_selector'] = None if navigation_url_addition.strip(): updated_config['navigation_url_addition'] = navigation_url_addition.strip() else: updated_config['navigation_url_addition'] = None if start_page.strip(): try: updated_config['start_page'] = int(start_page.strip()) except ValueError: return gr.update() else: updated_config['start_page'] = 0 # Handle array fields if pdf_links.strip(): pdf_list = [p.strip() for p in pdf_links.split(',') if p.strip()] updated_config['pdf_links'] = pdf_list elif 'pdf_links' in updated_config: del updated_config['pdf_links'] if file_links.strip(): file_list = [f.strip() for f in file_links.split(',') if f.strip()] updated_config['file_links'] = file_list elif 'file_links' in updated_config: del updated_config['file_links'] if recaptcha_text.strip(): updated_config['recaptcha_text'] = recaptcha_text.strip() elif 'recaptcha_text' in updated_config: del updated_config['recaptcha_text'] # Update config config[website_type] = updated_config return config except Exception as e: logger.error(f"Error updating website: {str(e)}") return gr.update() def delete_website(website_type, current_state): """Delete a website configuration""" try: if not website_type: return gr.update(), gr.update() # Load current config or use state config = current_state if current_state else load_website_config() if not config: return gr.update(), gr.update() if website_type not in config: return gr.update(), gr.update() # Remove website del config[website_type] # Update dropdown choices website_list = list(config.keys()) return (gr.update(choices=website_list, value=website_list[0] if website_list else None), config) except Exception as e: logger.error(f"Error deleting website: {str(e)}") return gr.update(), gr.update() def save_all_changes(current_state): """Save all changes to file""" try: from scraper_common import save_website_config # Use current state or load from file config = current_state if current_state else load_website_config() if not config: return gr.update(), {} # Save configuration success, message = save_website_config(config) if success: # Reload to get updated list updated_config = load_website_config() website_list = list(updated_config.keys()) return (gr.update(choices=website_list), {}) else: return (gr.update(), current_state) except Exception as e: logger.error(f"Error saving configuration: {str(e)}") return gr.update(), current_state def cancel_changes(): """Cancel changes and reload from file""" try: # Reload from file config = load_website_config() website_list = list(config.keys()) # Reset form if website is selected if website_list: form_values = get_config_for_website(website_list[0]) return (gr.update(choices=website_list, value=website_list[0]), form_values[0], # base_url form_values[1], # article_links form_values[2], # page_links form_values[3], # title form_values[4], # content form_values[5], # date form_values[6], # navigation_selector form_values[7], # navigation_url_addition form_values[8], # start_page form_values[9], # pdf_links form_values[10], # file_links form_values[11], # recaptcha_text {}) else: return (gr.update(choices=[]), '', '', '', '', '', '', '', '', '0', '', '', '', {}) except Exception as e: logger.error(f"Error cancelling changes: {str(e)}") return (gr.update(), '', '', '', '', '', '', '', '', '0', '', '', '', {}) # Event handlers website_dropdown.change( fn=get_config_for_website, inputs=[website_dropdown, unsaved_config_state], outputs=[base_url_field, article_links_field, page_links_field, title_field, content_field, date_field, navigation_selector_field, navigation_url_addition_field, start_page_field, pdf_links_field, file_links_field, recaptcha_text_field] ) add_website_btn.click( fn=add_new_website, inputs=[new_website_type, base_url_field, article_links_field, page_links_field, title_field, content_field, date_field, navigation_selector_field, navigation_url_addition_field, start_page_field, pdf_links_field, file_links_field, recaptcha_text_field, unsaved_config_state], outputs=[website_dropdown, unsaved_config_state] ) update_website_btn.click( fn=update_website, inputs=[website_dropdown, base_url_field, article_links_field, page_links_field, title_field, content_field, date_field, navigation_selector_field, navigation_url_addition_field, start_page_field, pdf_links_field, file_links_field, recaptcha_text_field, unsaved_config_state], outputs=[unsaved_config_state] ) delete_website_btn.click( fn=delete_website, inputs=[website_dropdown, unsaved_config_state], outputs=[website_dropdown, unsaved_config_state] ) save_all_btn.click( fn=save_all_changes, inputs=[unsaved_config_state], outputs=[website_dropdown, unsaved_config_state] ) cancel_btn.click( fn=cancel_changes, outputs=[website_dropdown, base_url_field, article_links_field, page_links_field, title_field, content_field, date_field, navigation_selector_field, navigation_url_addition_field, start_page_field, pdf_links_field, file_links_field, recaptcha_text_field, unsaved_config_state] ) def create_main_app(): """ Create the main application with authentication flow """ with gr.Blocks( title="Raagsan Dashboard Web Scrapping", theme=gr.themes.Soft(), css=""" /* Global Container Styles */ .gradio-container { max-width: 1400px !important; margin: 0 auto !important; width: 100% !important; padding: 20px !important; min-height: 100vh !important; } /* Ensure all tabs use full width */ .tabs > .tab-nav, .tabs > .tabitem { max-width: 1400px !important; width: 100% !important; } /* Tab Navigation Styling */ .tab-nav button { border: 2px solid var(--border-color-primary) !important; border-radius: 10px 10px 0 0 !important; margin-right: 5px !important; padding: 12px 24px !important; font-weight: 600 !important; transition: all 0.3s ease !important; box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1) !important; } .tab-nav button[aria-selected="true"] { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; border-color: #667eea !important; box-shadow: 0 4px 8px rgba(102, 126, 234, 0.3) !important; } .tab-nav button:hover { transform: translateY(-2px) !important; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.15) !important; } /* Tab Content Container */ .tabitem { border: 2px solid var(--border-color-primary) !important; border-radius: 0 10px 10px 10px !important; padding: 30px !important; box-shadow: 0 10px 30px rgba(0, 0, 0, 0.15) !important; margin-top: 0 !important; } /* Ensure rows and columns in all tabs expand to full width */ .gradio-row { width: 100% !important; gap: 20px !important; margin-bottom: 15px !important; } .gradio-column { width: 100% !important; } /* Card Style for Sections */ .gradio-group { border: 2px solid var(--border-color-primary) !important; border-radius: 12px !important; padding: 25px !important; margin: 15px 0 !important; box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1) !important; transition: all 0.3s ease !important; } .gradio-group:hover { box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2) !important; border-color: #667eea !important; } /* Input Fields Styling */ .gradio-textbox input, .gradio-textbox textarea { border: 2px solid var(--border-color-primary) !important; border-radius: 8px !important; padding: 12px !important; font-size: 14px !important; transition: all 0.3s ease !important; } .gradio-textbox input:focus, .gradio-textbox textarea:focus { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.2) !important; outline: none !important; } /* Dropdown Styling */ .gradio-dropdown { border-radius: 8px !important; } .gradio-dropdown > div { border: 2px solid var(--border-color-primary) !important; border-radius: 8px !important; transition: all 0.3s ease !important; } .gradio-dropdown > div:focus-within { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.2) !important; } /* Button Styling */ button { border-radius: 8px !important; padding: 10px 24px !important; font-weight: 600 !important; transition: all 0.3s ease !important; border: none !important; } button:hover { transform: translateY(-2px) !important; box-shadow: 0 6px 12px rgba(0, 0, 0, 0.15) !important; } button:active { transform: translateY(0) !important; } /* Primary Button */ button[variant="primary"] { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; box-shadow: 0 4px 8px rgba(102, 126, 234, 0.3) !important; } /* Secondary Button */ button[variant="secondary"] { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%) !important; color: white !important; box-shadow: 0 4px 8px rgba(245, 87, 108, 0.3) !important; } /* Stop/Danger Button */ button[variant="stop"] { background: linear-gradient(135deg, #fa709a 0%, #fee140 100%) !important; color: #333 !important; box-shadow: 0 4px 8px rgba(250, 112, 154, 0.3) !important; } /* Dataframe Styling */ .gradio-dataframe { border: 2px solid var(--border-color-primary) !important; border-radius: 12px !important; overflow: hidden !important; box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15) !important; } .gradio-dataframe table { border-collapse: separate !important; border-spacing: 0 !important; } .gradio-dataframe th { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; padding: 15px !important; font-weight: 600 !important; text-transform: uppercase !important; font-size: 12px !important; letter-spacing: 0.5px !important; border: 1px solid #667eea !important; } .gradio-dataframe td { padding: 12px 15px !important; border: 1px solid var(--border-color-primary) !important; } .gradio-dataframe tr:hover { background-color: rgba(102, 126, 234, 0.1) !important; } /* Markdown Headings */ h2 { font-weight: 700 !important; margin-bottom: 10px !important; font-size: 24px !important; } h3 { font-weight: 600 !important; margin-bottom: 8px !important; font-size: 18px !important; } /* Login Container */ .login-container { max-width: 500px !important; margin: 50px auto !important; padding: 40px !important; border-radius: 20px !important; border: 2px solid var(--border-color-primary) !important; box-shadow: 0 20px 60px rgba(0, 0, 0, 0.2) !important; } /* Dashboard Header */ .dashboard-header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; color: white !important; padding: 25px !important; border-radius: 15px !important; margin-bottom: 25px !important; box-shadow: 0 10px 30px rgba(102, 126, 234, 0.3) !important; } .dashboard-header * { color: white !important; } .dashboard-header h1, .dashboard-header h2, .dashboard-header h3, .dashboard-header p, .dashboard-header span, .dashboard-header div { color: white !important; } .header-row { display: flex !important; align-items: center !important; justify-content: space-between !important; gap: 20px !important; } .header-left { flex: 1 !important; } .header-left * { color: white !important; } .header-right { display: flex !important; flex-direction: column !important; align-items: flex-end !important; gap: 10px !important; } .header-right * { color: white !important; } .user-welcome { margin: 0 !important; font-size: 16px !important; font-weight: 500 !important; color: white !important; } .user-welcome * { color: white !important; } .logout-btn { min-width: 100px !important; background: rgba(255, 255, 255, 0.2) !important; backdrop-filter: blur(10px) !important; border: 2px solid white !important; color: white !important; } .logout-btn:hover { background: white !important; color: #667eea !important; } /* Status Messages */ .status-success { color: #28a745 !important; font-weight: bold !important; } .status-error { color: #dc3545 !important; font-weight: bold !important; } /* Admin Panel Specific */ .admin-panel { border: 2px solid var(--border-color-primary) !important; padding: 20px !important; border-radius: 15px !important; margin-top: 20px !important; box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15) !important; } .admin-group { border: 2px solid var(--border-color-primary) !important; border-radius: 12px !important; padding: 25px !important; margin: 15px 0 !important; box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1) !important; } .admin-section { margin-bottom: 30px !important; border: 2px solid var(--border-color-primary) !important; border-radius: 15px !important; padding: 20px !important; box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1) !important; } .admin-button { margin: 5px !important; } /* Label Styling */ label { font-weight: 600 !important; font-size: 14px !important; margin-bottom: 8px !important; } /* Info Text */ .gradio-info { font-size: 12px !important; font-style: italic !important; opacity: 0.8 !important; } /* Download Button */ .download-button { background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%) !important; color: white !important; box-shadow: 0 4px 8px rgba(79, 172, 254, 0.3) !important; } /* Scrollbar Styling */ ::-webkit-scrollbar { width: 10px !important; height: 10px !important; } ::-webkit-scrollbar-track { background: var(--background-fill-secondary) !important; border-radius: 10px !important; } ::-webkit-scrollbar-thumb { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; border-radius: 10px !important; } ::-webkit-scrollbar-thumb:hover { background: linear-gradient(135deg, #764ba2 0%, #667eea 100%) !important; } /* Status Textbox Styling */ .gradio-textbox[label="Status"] { border: 2px solid var(--border-color-primary) !important; border-radius: 10px !important; padding: 15px !important; } /* Checkbox Styling */ input[type="checkbox"] { width: 20px !important; height: 20px !important; accent-color: #667eea !important; } /* Markdown Paragraphs */ p { line-height: 1.6 !important; } /* Section Dividers */ hr { border: none !important; height: 2px !important; background: var(--border-color-primary) !important; margin: 30px 0 !important; opacity: 0.3 !important; } /* Better spacing for form elements */ .gradio-form { gap: 15px !important; } /* Hover effects for cards */ .admin-section:hover { transform: translateY(-2px) !important; transition: all 0.3s ease !important; } /* Loading Animation Enhancement */ @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.5; } } .loading { animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite !important; } /* Improve link styling in markdown */ a { color: #667eea !important; text-decoration: none !important; font-weight: 600 !important; transition: all 0.3s ease !important; } a:hover { color: #764ba2 !important; text-decoration: underline !important; } /* Better spacing for rows within groups */ .gradio-group .gradio-row { margin-bottom: 10px !important; } """ ) as main_app: # State to track authentication auth_state = gr.State({"authenticated": False, "user": None}) # Main content area with gr.Column() as main_content: # Login section with gr.Row(visible=True) as login_section: with gr.Column(elem_classes="login-container"): gr.Markdown("# Dashboard Login") gr.Markdown("Please login to access Dashboard") with gr.Row(): username_input = gr.Textbox( label="Username", placeholder="Enter your username", interactive=True, scale=2 ) with gr.Row(): password_input = gr.Textbox( label="Password", placeholder="Enter your password", type="password", interactive=True, scale=2 ) with gr.Row(): login_btn = gr.Button("Login", variant="primary", scale=1) login_status = gr.Textbox( label="Status", value="Ready to login - Enter your credentials above", interactive=False, elem_classes="status-success" ) # Dashboard section (initially hidden) with gr.Column(visible=False) as dashboard_section: # Header with user info and logout with gr.Column(elem_classes="dashboard-header"): with gr.Row(elem_classes="header-row"): # Left side - Title and description with gr.Column(scale=3, elem_classes="header-left"): gr.Markdown("# Raagsan Dashboard") gr.Markdown("Extract and analyze content from websites and documents (PDF, DOC, CSV).") # Right side - User info and logout with gr.Column(scale=1, elem_classes="header-right"): user_info = gr.Markdown("Welcome, Guest", elem_classes="user-welcome") logout_btn = gr.Button("Logout", variant="stop", size="sm", elem_classes="logout-btn") # Create tabs with gr.Tabs(): create_text_content_tab() create_document_content_tab() create_archive_tab() create_keywords_management_tab() create_admin_tab() create_website_config_tab() def handle_login(username, password): """Handle login attempt""" if not username or not password: return "Please enter both username and password", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False, value="Welcome, Guest") success, message = login_user(username, password) if success: return f"{message}", gr.update(visible=False), gr.update(visible=True), gr.update(visible=True, value=f"Welcome, {username}") else: return f"{message}", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False, value="Welcome, Guest") def handle_logout(): """Handle logout""" message = logout_user() return f"{message}", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False, value="Welcome, Guest"), gr.update(value=""), gr.update(value="") # Event handlers login_btn.click( fn=handle_login, inputs=[username_input, password_input], outputs=[login_status, login_section, dashboard_section, user_info] ) logout_btn.click( fn=handle_logout, outputs=[login_status, login_section, dashboard_section, user_info, username_input, password_input] ) return main_app # Create the main app instance demo = create_main_app() if __name__ == "__main__": # Clean up expired sessions on startup auth_manager.cleanup_expired_sessions() # Launch the application demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=True )