import httpx import re import time import random # --- Custom Exceptions --- class EmailFetcherError(Exception): """Base exception for EmailFetcher errors.""" pass class TokenError(EmailFetcherError): """Raised when there's an error obtaining the access token.""" pass class GraphApiError(EmailFetcherError): """Raised when there's an error during a Microsoft Graph API call.""" pass class InvalidRegexError(EmailFetcherError): """Raised when the provided regex pattern is invalid.""" pass # --- End Custom Exceptions --- class EmailFetcher: BASE_URL = "https://graph.microsoft.com/v1.0" TOKEN_URL = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" MAX_RETRIES = 3 # Maximum number of retry attempts BASE_RETRY_DELAY = 2 # Base delay in seconds def __init__(self, client_id, refresh_token): self.client_id = client_id self.refresh_token = refresh_token self.access_token = None def _get_access_token(self): """Retrieves or refreshes the access token. Raises TokenError on failure.""" data = { "client_id": self.client_id, "grant_type": "refresh_token", "refresh_token": self.refresh_token, "scope": "https://graph.microsoft.com/.default", } retries = 0 while retries <= self.MAX_RETRIES: try: response = httpx.post(self.TOKEN_URL, data=data) # Handle 429 Too Many Requests specifically if response.status_code == 429: if retries == self.MAX_RETRIES: error_msg = f"Token request failed after {self.MAX_RETRIES} retries due to rate limiting (429)" print(error_msg) raise TokenError(error_msg) # Get retry-after header if available, otherwise use exponential backoff retry_after = response.headers.get('Retry-After') if retry_after and retry_after.isdigit(): delay = int(retry_after) else: # Exponential backoff with jitter delay = self.BASE_RETRY_DELAY * (2 ** retries) + random.uniform(0, 1) print( f"Rate limited (429). Retrying in {delay:.2f} seconds... (Attempt {retries + 1}/{self.MAX_RETRIES})") time.sleep(delay) retries += 1 continue response.raise_for_status() # Raise an exception for other bad status codes result = response.json() if "error" in result: error_msg = f"Access token error: {result.get('error')}, Description: {result.get('error_description')}" print(error_msg) # Also print for immediate feedback raise TokenError(error_msg) self.access_token = result.get("access_token") if not self.access_token: raise TokenError("No access token found in the response.") return self.access_token except httpx.HTTPStatusError as e: # For other HTTP errors that aren't 429 if e.response.status_code != 429: error_msg = f"Token request failed with status {e.response.status_code}: {e.response.text}" print(error_msg) raise TokenError(error_msg) from e # 429 errors are handled in the code above except httpx.RequestError as e: error_msg = f"Token request network error: {e}" print(error_msg) raise TokenError(error_msg) from e except Exception as e: if isinstance(e, TokenError): raise # Re-raise TokenError without wrapping error_msg = f"An unexpected error occurred during token request: {e}" print(error_msg) raise TokenError(error_msg) from e def fetch_emails(self, top=50, sender_filter=None): """ Fetches emails and performs client-side filtering. Raises TokenError or GraphApiError on failure. Args: top: Maximum number of emails to fetch sender_filter: Optional email address to filter by (client-side filtering) Returns: List of filtered email objects """ try: if not self.access_token: # Attempt to get token, will raise TokenError if it fails self._get_access_token() except TokenError as e: # Re-raise token errors so the caller knows token fetch failed raise GraphApiError( "Failed to obtain access token before fetching emails." ) from e # Construct the base URL url = f"{self.BASE_URL}/me/messages" # Prepare query parameters - only use ordering and top, no server-side filtering params = {"$orderby": "receivedDateTime desc", "$top": top} headers = {"Authorization": f"Bearer {self.access_token}"} retries = 0 while retries <= self.MAX_RETRIES: try: response = httpx.get(url, headers=headers, params=params) # Handle 429 Too Many Requests specifically if response.status_code == 429: if retries == self.MAX_RETRIES: error_msg = f"Graph API request failed after {self.MAX_RETRIES} retries due to rate limiting (429)" print(error_msg) raise GraphApiError(error_msg) # Get retry-after header if available, otherwise use exponential backoff retry_after = response.headers.get('Retry-After') if retry_after and retry_after.isdigit(): delay = int(retry_after) else: # Exponential backoff with jitter delay = self.BASE_RETRY_DELAY * (2 ** retries) + random.uniform(0, 1) print( f"Rate limited (429). Retrying in {delay:.2f} seconds... (Attempt {retries + 1}/{self.MAX_RETRIES})") time.sleep(delay) retries += 1 continue response.raise_for_status() # Check for other HTTP errors all_emails = response.json().get("value", []) # Perform client-side filtering if a sender filter is specified if sender_filter and all_emails: filtered_emails = [] for email in all_emails: sender_address = ( email.get("from", {}).get("emailAddress", {}).get("address", "") ) if ( sender_address and sender_filter.lower() in sender_address.lower() ): filtered_emails.append(email) return filtered_emails # Return all emails if no filter is specified return all_emails except httpx.HTTPStatusError as e: # For other HTTP errors that aren't 429 if e.response.status_code != 429: error_msg = f"Graph API request failed with status {e.response.status_code}: {e.response.text}" print(error_msg) # Handle specific errors like token expiry if needed if e.response.status_code == 401: # Unauthorized - token might have expired print( "Access token might be invalid or expired. Clearing token for retry." ) self.access_token = None # Clear token raise GraphApiError(error_msg) from e # 429 errors are handled in the code above except httpx.RequestError as e: error_msg = f"Graph API request network error: {e}" print(error_msg) raise GraphApiError(error_msg) from e except Exception as e: error_msg = f"An unexpected error occurred during Graph API request: {e}" print(error_msg) raise GraphApiError(error_msg) from e def extract_verification_code(self, email, pattern): """Extracts code using regex. Raises InvalidRegexError or returns None.""" if not email or not isinstance(email, dict): return None subject = email.get("subject", "") body_preview = email.get("bodyPreview", "") body_content = email.get("body", {}).get("content", "") try: regex = re.compile(pattern) except re.error as e: error_msg = f"Invalid regex pattern '{pattern}': {e}" print(error_msg) raise InvalidRegexError(error_msg) from e # Search order: subject, body preview, full body for text_source in [subject, body_preview, body_content]: if text_source: # Ensure source is not None or empty match = regex.search(text_source) if match: # Assuming the code is in the first capture group # If pattern has no groups, match.group(0) is the whole match return match.group(1) if regex.groups >= 1 else match.group(0) return None # No code found # 提取验证码 def extract_email_content(client_id,refresh_token): fetcher = EmailFetcher(client_id, refresh_token) # 获取最新50封邮件 emails = fetcher.fetch_emails(top=1) for email in emails: print(email) # 使用正则表达式提取验证码 code = fetcher.extract_verification_code( email=email, pattern=r"验证码.*?(\d{6})" # 根据实际邮件内容调整正则 ) print("code: ", code) if __name__ == "__main__": # 读取文本文件 以----分割 格式:邮箱----密码---Token-----Client ID with open("accounts.txt", "r") as f: for index, line in enumerate(f): email, password, refresh_token, client_id= line.strip().split("----") extract_email_content(client_id,refresh_token) print("=" * 30)