| | """ |
| | RPC Node Collectors |
| | Fetches blockchain data from RPC endpoints (Infura, Alchemy, Ankr, etc.) |
| | """ |
| |
|
| | import asyncio |
| | from datetime import datetime, timezone |
| | from typing import Dict, List, Optional, Any |
| | from utils.api_client import get_client |
| | from utils.logger import setup_logger, log_api_request, log_error |
| |
|
| | logger = setup_logger("rpc_collector") |
| |
|
| |
|
| | async def get_eth_block_number(provider: str, rpc_url: str, api_key: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Fetch latest Ethereum block number from RPC endpoint |
| | |
| | Args: |
| | provider: Provider name (e.g., "Infura", "Alchemy") |
| | rpc_url: RPC endpoint URL |
| | api_key: Optional API key to append to URL |
| | |
| | Returns: |
| | Dict with provider, category, data, timestamp, success, error |
| | """ |
| | category = "rpc_nodes" |
| | endpoint = "eth_blockNumber" |
| |
|
| | logger.info(f"Fetching block number from {provider}") |
| |
|
| | try: |
| | client = get_client() |
| |
|
| | |
| | url = f"{rpc_url}/{api_key}" if api_key else rpc_url |
| |
|
| | |
| | payload = { |
| | "jsonrpc": "2.0", |
| | "method": "eth_blockNumber", |
| | "params": [], |
| | "id": 1 |
| | } |
| |
|
| | headers = {"Content-Type": "application/json"} |
| |
|
| | |
| | response = await client.post(url, json=payload, headers=headers, timeout=10) |
| |
|
| | |
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | error_msg = response.get("error_message", "Unknown error") |
| | log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": response.get("error_type") |
| | } |
| |
|
| | |
| | data = response["data"] |
| |
|
| | |
| | block_data = None |
| | if isinstance(data, dict) and "result" in data: |
| | hex_block = data["result"] |
| | block_number = int(hex_block, 16) if hex_block else 0 |
| | block_data = { |
| | "block_number": block_number, |
| | "hex": hex_block, |
| | "chain": "ethereum" |
| | } |
| |
|
| | logger.info(f"{provider} - {endpoint} - Block: {block_data.get('block_number', 'N/A')}") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": block_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_eth_gas_price(provider: str, rpc_url: str, api_key: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Fetch current gas price from RPC endpoint |
| | |
| | Args: |
| | provider: Provider name |
| | rpc_url: RPC endpoint URL |
| | api_key: Optional API key |
| | |
| | Returns: |
| | Dict with gas price data |
| | """ |
| | category = "rpc_nodes" |
| | endpoint = "eth_gasPrice" |
| |
|
| | logger.info(f"Fetching gas price from {provider}") |
| |
|
| | try: |
| | client = get_client() |
| | url = f"{rpc_url}/{api_key}" if api_key else rpc_url |
| |
|
| | payload = { |
| | "jsonrpc": "2.0", |
| | "method": "eth_gasPrice", |
| | "params": [], |
| | "id": 1 |
| | } |
| |
|
| | headers = {"Content-Type": "application/json"} |
| | response = await client.post(url, json=payload, headers=headers, timeout=10) |
| |
|
| | log_api_request( |
| | logger, |
| | provider, |
| | endpoint, |
| | response.get("response_time_ms", 0), |
| | "success" if response["success"] else "error", |
| | response.get("status_code") |
| | ) |
| |
|
| | if not response["success"]: |
| | error_msg = response.get("error_message", "Unknown error") |
| | log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": response.get("error_type") |
| | } |
| |
|
| | data = response["data"] |
| | gas_data = None |
| |
|
| | if isinstance(data, dict) and "result" in data: |
| | hex_gas = data["result"] |
| | gas_wei = int(hex_gas, 16) if hex_gas else 0 |
| | gas_gwei = gas_wei / 1e9 |
| |
|
| | gas_data = { |
| | "gas_price_wei": gas_wei, |
| | "gas_price_gwei": round(gas_gwei, 2), |
| | "hex": hex_gas, |
| | "chain": "ethereum" |
| | } |
| |
|
| | logger.info(f"{provider} - {endpoint} - Gas: {gas_data.get('gas_price_gwei', 'N/A')} Gwei") |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": gas_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | error_msg = f"Unexpected error: {str(e)}" |
| | log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg, |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def get_eth_chain_id(provider: str, rpc_url: str, api_key: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Fetch chain ID from RPC endpoint |
| | |
| | Args: |
| | provider: Provider name |
| | rpc_url: RPC endpoint URL |
| | api_key: Optional API key |
| | |
| | Returns: |
| | Dict with chain ID data |
| | """ |
| | category = "rpc_nodes" |
| | endpoint = "eth_chainId" |
| |
|
| | try: |
| | client = get_client() |
| | url = f"{rpc_url}/{api_key}" if api_key else rpc_url |
| |
|
| | payload = { |
| | "jsonrpc": "2.0", |
| | "method": "eth_chainId", |
| | "params": [], |
| | "id": 1 |
| | } |
| |
|
| | headers = {"Content-Type": "application/json"} |
| | response = await client.post(url, json=payload, headers=headers, timeout=10) |
| |
|
| | if not response["success"]: |
| | error_msg = response.get("error_message", "Unknown error") |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": error_msg |
| | } |
| |
|
| | data = response["data"] |
| | chain_data = None |
| |
|
| | if isinstance(data, dict) and "result" in data: |
| | hex_chain = data["result"] |
| | chain_id = int(hex_chain, 16) if hex_chain else 0 |
| |
|
| | |
| | chain_names = { |
| | 1: "Ethereum Mainnet", |
| | 3: "Ropsten", |
| | 4: "Rinkeby", |
| | 5: "Goerli", |
| | 11155111: "Sepolia", |
| | 56: "BSC Mainnet", |
| | 97: "BSC Testnet", |
| | 137: "Polygon Mainnet", |
| | 80001: "Mumbai Testnet" |
| | } |
| |
|
| | chain_data = { |
| | "chain_id": chain_id, |
| | "chain_name": chain_names.get(chain_id, f"Unknown (ID: {chain_id})"), |
| | "hex": hex_chain |
| | } |
| |
|
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": chain_data, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": True, |
| | "error": None, |
| | "response_time_ms": response.get("response_time_ms", 0) |
| | } |
| |
|
| | except Exception as e: |
| | return { |
| | "provider": provider, |
| | "category": category, |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(e), |
| | "error_type": "exception" |
| | } |
| |
|
| |
|
| | async def collect_infura_data(api_key: Optional[str] = None) -> List[Dict[str, Any]]: |
| | """ |
| | Collect data from Infura RPC endpoints |
| | |
| | Args: |
| | api_key: Infura project ID |
| | |
| | Returns: |
| | List of results from Infura endpoints |
| | """ |
| | provider = "Infura" |
| | rpc_url = "https://mainnet.infura.io/v3" |
| |
|
| | if not api_key: |
| | logger.warning(f"{provider} - No API key provided, skipping") |
| | return [{ |
| | "provider": provider, |
| | "category": "rpc_nodes", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": "API key required", |
| | "error_type": "missing_api_key" |
| | }] |
| |
|
| | logger.info(f"Starting {provider} data collection") |
| |
|
| | results = await asyncio.gather( |
| | get_eth_block_number(provider, rpc_url, api_key), |
| | get_eth_gas_price(provider, rpc_url, api_key), |
| | get_eth_chain_id(provider, rpc_url, api_key), |
| | return_exceptions=True |
| | ) |
| |
|
| | processed = [] |
| | for result in results: |
| | if isinstance(result, Exception): |
| | logger.error(f"{provider} - Collector failed: {str(result)}") |
| | processed.append({ |
| | "provider": provider, |
| | "category": "rpc_nodes", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(result), |
| | "error_type": "exception" |
| | }) |
| | else: |
| | processed.append(result) |
| |
|
| | successful = sum(1 for r in processed if r.get("success", False)) |
| | logger.info(f"{provider} - Collection complete: {successful}/{len(processed)} successful") |
| |
|
| | return processed |
| |
|
| |
|
| | async def collect_alchemy_data(api_key: Optional[str] = None) -> List[Dict[str, Any]]: |
| | """ |
| | Collect data from Alchemy RPC endpoints |
| | |
| | Args: |
| | api_key: Alchemy API key |
| | |
| | Returns: |
| | List of results from Alchemy endpoints |
| | """ |
| | provider = "Alchemy" |
| | rpc_url = "https://eth-mainnet.g.alchemy.com/v2" |
| |
|
| | if not api_key: |
| | logger.warning(f"{provider} - No API key provided, using free tier") |
| | |
| | api_key = "demo" |
| |
|
| | logger.info(f"Starting {provider} data collection") |
| |
|
| | results = await asyncio.gather( |
| | get_eth_block_number(provider, rpc_url, api_key), |
| | get_eth_gas_price(provider, rpc_url, api_key), |
| | get_eth_chain_id(provider, rpc_url, api_key), |
| | return_exceptions=True |
| | ) |
| |
|
| | processed = [] |
| | for result in results: |
| | if isinstance(result, Exception): |
| | logger.error(f"{provider} - Collector failed: {str(result)}") |
| | processed.append({ |
| | "provider": provider, |
| | "category": "rpc_nodes", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(result), |
| | "error_type": "exception" |
| | }) |
| | else: |
| | processed.append(result) |
| |
|
| | successful = sum(1 for r in processed if r.get("success", False)) |
| | logger.info(f"{provider} - Collection complete: {successful}/{len(processed)} successful") |
| |
|
| | return processed |
| |
|
| |
|
| | async def collect_ankr_data() -> List[Dict[str, Any]]: |
| | """ |
| | Collect data from Ankr public RPC endpoints (no key required) |
| | |
| | Returns: |
| | List of results from Ankr endpoints |
| | """ |
| | provider = "Ankr" |
| | rpc_url = "https://rpc.ankr.com/eth" |
| |
|
| | logger.info(f"Starting {provider} data collection") |
| |
|
| | results = await asyncio.gather( |
| | get_eth_block_number(provider, rpc_url), |
| | get_eth_gas_price(provider, rpc_url), |
| | get_eth_chain_id(provider, rpc_url), |
| | return_exceptions=True |
| | ) |
| |
|
| | processed = [] |
| | for result in results: |
| | if isinstance(result, Exception): |
| | logger.error(f"{provider} - Collector failed: {str(result)}") |
| | processed.append({ |
| | "provider": provider, |
| | "category": "rpc_nodes", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(result), |
| | "error_type": "exception" |
| | }) |
| | else: |
| | processed.append(result) |
| |
|
| | successful = sum(1 for r in processed if r.get("success", False)) |
| | logger.info(f"{provider} - Collection complete: {successful}/{len(processed)} successful") |
| |
|
| | return processed |
| |
|
| |
|
| | async def collect_public_rpc_data() -> List[Dict[str, Any]]: |
| | """ |
| | Collect data from free public RPC endpoints |
| | |
| | Returns: |
| | List of results from public endpoints |
| | """ |
| | logger.info("Starting public RPC data collection") |
| |
|
| | public_rpcs = [ |
| | ("Cloudflare", "https://cloudflare-eth.com"), |
| | ("PublicNode", "https://ethereum.publicnode.com"), |
| | ("LlamaNodes", "https://eth.llamarpc.com"), |
| | ] |
| |
|
| | all_results = [] |
| |
|
| | for provider, rpc_url in public_rpcs: |
| | results = await asyncio.gather( |
| | get_eth_block_number(provider, rpc_url), |
| | get_eth_gas_price(provider, rpc_url), |
| | return_exceptions=True |
| | ) |
| |
|
| | for result in results: |
| | if isinstance(result, Exception): |
| | logger.error(f"{provider} - Collector failed: {str(result)}") |
| | all_results.append({ |
| | "provider": provider, |
| | "category": "rpc_nodes", |
| | "data": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat(), |
| | "success": False, |
| | "error": str(result), |
| | "error_type": "exception" |
| | }) |
| | else: |
| | all_results.append(result) |
| |
|
| | successful = sum(1 for r in all_results if r.get("success", False)) |
| | logger.info(f"Public RPC collection complete: {successful}/{len(all_results)} successful") |
| |
|
| | return all_results |
| |
|
| |
|
| | async def collect_rpc_data( |
| | infura_key: Optional[str] = None, |
| | alchemy_key: Optional[str] = None |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Main function to collect RPC data from all sources |
| | |
| | Args: |
| | infura_key: Infura project ID |
| | alchemy_key: Alchemy API key |
| | |
| | Returns: |
| | List of results from all RPC collectors |
| | """ |
| | logger.info("Starting RPC data collection from all sources") |
| |
|
| | |
| | all_results = [] |
| |
|
| | |
| | if infura_key: |
| | infura_results = await collect_infura_data(infura_key) |
| | all_results.extend(infura_results) |
| |
|
| | |
| | alchemy_results = await collect_alchemy_data(alchemy_key) |
| | all_results.extend(alchemy_results) |
| |
|
| | |
| | ankr_results = await collect_ankr_data() |
| | all_results.extend(ankr_results) |
| |
|
| | |
| | public_results = await collect_public_rpc_data() |
| | all_results.extend(public_results) |
| |
|
| | |
| | successful = sum(1 for r in all_results if r.get("success", False)) |
| | logger.info(f"RPC data collection complete: {successful}/{len(all_results)} successful") |
| |
|
| | return all_results |
| |
|
| |
|
| | class RPCNodeCollector: |
| | """ |
| | RPC Node Collector class for WebSocket streaming interface |
| | Wraps the standalone RPC node collection functions |
| | """ |
| |
|
| | def __init__(self, config: Any = None): |
| | """ |
| | Initialize the RPC node collector |
| | |
| | Args: |
| | config: Configuration object (optional, for compatibility) |
| | """ |
| | self.config = config |
| | self.logger = logger |
| |
|
| | async def collect(self) -> Dict[str, Any]: |
| | """ |
| | Collect RPC node data from all sources |
| | |
| | Returns: |
| | Dict with aggregated RPC node data |
| | """ |
| | import os |
| | infura_key = os.getenv("INFURA_API_KEY") |
| | alchemy_key = os.getenv("ALCHEMY_API_KEY") |
| | results = await collect_rpc_data(infura_key, alchemy_key) |
| |
|
| | |
| | aggregated = { |
| | "nodes": [], |
| | "active_nodes": 0, |
| | "total_nodes": 0, |
| | "average_latency": 0, |
| | "events": [], |
| | "block_number": None, |
| | "timestamp": datetime.now(timezone.utc).isoformat() |
| | } |
| |
|
| | total_latency = 0 |
| | latency_count = 0 |
| |
|
| | for result in results: |
| | aggregated["total_nodes"] += 1 |
| |
|
| | if result.get("success"): |
| | aggregated["active_nodes"] += 1 |
| | provider = result.get("provider", "unknown") |
| | response_time = result.get("response_time_ms", 0) |
| | data = result.get("data", {}) |
| |
|
| | |
| | if response_time: |
| | total_latency += response_time |
| | latency_count += 1 |
| |
|
| | |
| | node_info = { |
| | "provider": provider, |
| | "response_time_ms": response_time, |
| | "status": "active", |
| | "data": data |
| | } |
| |
|
| | |
| | if "result" in data and isinstance(data["result"], str): |
| | try: |
| | block_number = int(data["result"], 16) |
| | node_info["block_number"] = block_number |
| | if aggregated["block_number"] is None or block_number > aggregated["block_number"]: |
| | aggregated["block_number"] = block_number |
| | except: |
| | pass |
| |
|
| | aggregated["nodes"].append(node_info) |
| |
|
| | |
| | if latency_count > 0: |
| | aggregated["average_latency"] = total_latency / latency_count |
| |
|
| | return aggregated |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | async def main(): |
| | import os |
| |
|
| | infura_key = os.getenv("INFURA_API_KEY") |
| | alchemy_key = os.getenv("ALCHEMY_API_KEY") |
| |
|
| | results = await collect_rpc_data(infura_key, alchemy_key) |
| |
|
| | print("\n=== RPC Data Collection Results ===") |
| | for result in results: |
| | print(f"\nProvider: {result['provider']}") |
| | print(f"Success: {result['success']}") |
| | if result['success']: |
| | print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") |
| | data = result.get('data', {}) |
| | if data: |
| | print(f"Data: {data}") |
| | else: |
| | print(f"Error: {result.get('error', 'Unknown')}") |
| |
|
| | asyncio.run(main()) |
| |
|