| """ |
| DocStrange Hugging Face Spaces API |
| Deploy this on Hugging Face Spaces to provide DocStrange extraction API |
| """ |
| import os |
| import sys |
| import tempfile |
| from pathlib import Path |
|
|
| from fastapi import FastAPI, File, UploadFile, HTTPException |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| import uvicorn |
|
|
| |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'docstrange')) |
|
|
| try: |
| from docstrange import DocumentExtractor |
| HAS_DOCTSTRANGE = True |
| except ImportError: |
| HAS_DOCTSTRANGE = False |
|
|
| app = FastAPI( |
| title="DocStrange Document Extractor API", |
| description="Extract structured data from documents using DocStrange AI", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| extractor = None |
|
|
|
|
| def get_extractor(): |
| """Get or create DocumentExtractor instance""" |
| global extractor |
| if extractor is None: |
| if not HAS_DOCTSTRANGE: |
| raise HTTPException(status_code=500, detail="DocStrange not installed") |
| |
| |
| try: |
| import torch |
| gpu_mode = torch.cuda.is_available() |
| except: |
| gpu_mode = False |
| |
| if gpu_mode: |
| extractor = DocumentExtractor(gpu=True) |
| else: |
| extractor = DocumentExtractor() |
| |
| return extractor |
|
|
|
|
| @app.get("/") |
| def root(): |
| """Health check""" |
| return { |
| "status": "ok", |
| "service": "DocStrange API", |
| "version": "1.0.0", |
| "gpu_available": HAS_DOCTSTRANGE |
| } |
|
|
|
|
| @app.get("/health") |
| def health(): |
| """Health check""" |
| try: |
| import torch |
| gpu = torch.cuda.is_available() |
| vram = f"{torch.cuda.get_device_properties(0).total_mem/1024**3:.1f}GB" if gpu else "N/A" |
| except: |
| gpu = False |
| vram = "N/A" |
| |
| return { |
| "status": "ok", |
| "gpu": gpu, |
| "vram": vram, |
| "docstrange": HAS_DOCTSTRANGE |
| } |
|
|
|
|
| @app.post("/extract") |
| async def extract_document( |
| file: UploadFile = File(...), |
| output_format: str = "markdown" |
| ): |
| """ |
| Extract structured data from document |
| |
| Args: |
| file: Document file (PDF, DOCX, XLSX, Images, etc.) |
| output_format: markdown, json, csv, html, text, flat-json, all |
| |
| Returns: JSON with extracted data |
| """ |
| if not file.filename: |
| raise HTTPException(status_code=400, detail="No file provided") |
| |
| supported_formats = ['.pdf', '.docx', '.xlsx', '.pptx', '.png', '.jpg', '.jpeg', |
| '.bmp', '.tiff', '.webp', '.gif', '.txt', '.html', '.md', '.csv'] |
| ext = Path(file.filename).suffix.lower() |
| if ext not in supported_formats: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported format: {ext}. Supported: {supported_formats}" |
| ) |
| |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
| |
| |
| ext = get_extractor() |
| result = ext.extract_document(tmp_path, output_format=output_format) |
| |
| |
| response = { |
| "success": True, |
| "file_name": file.filename, |
| "data": result.get('data', {}), |
| "format": result.get('format', output_format), |
| "metadata": { |
| "file_size": result.get('metadata', {}).get('file_size', 0), |
| "engine": "docstrange", |
| "gpu_mode": result.get('metadata', {}).get('gpu_mode', False) |
| } |
| } |
| |
| |
| os.unlink(tmp_path) |
| |
| return JSONResponse(content=response) |
| |
| except Exception as e: |
| |
| if 'tmp_path' in locals(): |
| try: |
| os.unlink(tmp_path) |
| except: |
| pass |
| |
| raise HTTPException(status_code=500, detail=f"Extraction failed: {str(e)}") |
|
|
|
|
| @app.post("/extract/markdown") |
| async def extract_to_markdown(file: UploadFile = File(...)): |
| """Extract document to markdown only (lightweight)""" |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix.lower()) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
| |
| ext = get_extractor() |
| result = ext.extract_document(tmp_path, output_format='markdown') |
| |
| os.unlink(tmp_path) |
| |
| return { |
| "success": True, |
| "markdown": result.get('data', ''), |
| "file_name": file.filename |
| } |
| |
| except Exception as e: |
| if 'tmp_path' in locals(): |
| try: |
| os.unlink(tmp_path) |
| except: |
| pass |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/extract/tables") |
| async def extract_tables(file: UploadFile = File(...)): |
| """Extract tables only from document""" |
| try: |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix.lower()) as tmp: |
| content = await file.read() |
| tmp.write(content) |
| tmp_path = tmp.name |
| |
| |
| ext = get_extractor() |
| result = ext.extract_document(tmp_path, output_format='json') |
| |
| data = result.get('data', {}) |
| tables = data.get('tables', []) |
| |
| os.unlink(tmp_path) |
| |
| return { |
| "success": True, |
| "tables": tables, |
| "tables_count": len(tables), |
| "file_name": file.filename |
| } |
| |
| except Exception as e: |
| if 'tmp_path' in locals(): |
| try: |
| os.unlink(tmp_path) |
| except: |
| pass |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| if __name__ == "__main__": |
| print("="*60) |
| print("DocStrange Document Extractor API") |
| print("="*60) |
| print("URL: http://localhost:8080") |
| print("Docs: http://localhost:8080/docs") |
| print("="*60) |
| |
| uvicorn.run( |
| "app:app", |
| host="0.0.0.0", |
| port=8080, |
| reload=True |
| ) |
|
|