-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathemail_utils.py
155 lines (132 loc) · 6.12 KB
/
email_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import re
import base64
import random
import json
import time
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
# Gmail API scope
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
# File to persist generated variations
VARIATIONS_FILE = os.path.join('json_files', 'generated_variations.json')
def authenticate_gmail():
creds = None
# The token.json file stores the user's access and refresh tokens.
token_path = os.path.join('json_files', 'token.json')
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
# If there are no valid credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
credentials_path = os.path.join('json_files', 'credentials.json')
flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run.
token_path = os.path.join('json_files', 'token.json')
with open(token_path, 'w') as token:
token.write(creds.to_json())
return creds
def calculate_email_variations(email):
username, domain = email.split("@")
positions = len(username) - 1
total_variations = 2 ** positions
return total_variations
def email_exists_in_file(email):
if not os.path.exists(VARIATIONS_FILE):
return False
try:
with open(VARIATIONS_FILE, 'r') as f:
variations = json.load(f)
return email in variations
except (json.JSONDecodeError, ValueError):
# If the file is empty or corrupted, return False
return False
def save_email_to_file(email):
if os.path.exists(VARIATIONS_FILE):
try:
with open(VARIATIONS_FILE, 'r') as f:
variations = json.load(f)
except (json.JSONDecodeError, ValueError):
# If the file is empty or corrupted, start with an empty list
variations = []
else:
variations = []
variations.append(email)
with open(VARIATIONS_FILE, 'w') as f:
json.dump(variations, f, indent=4)
def generate_gmail_variation(base_email, max_attempts=20):
local_part, domain = base_email.split('@')
if len(local_part) <= 2:
return base_email # Not enough characters to add dots
for _ in range(max_attempts):
local_part_variants = list(local_part)
max_dots = len(local_part) - 2
num_dots = random.randint(1, max_dots) if max_dots > 0 else 0
dot_positions = sorted(random.sample(range(1, len(local_part) - 1), num_dots))
for i, pos in enumerate(dot_positions):
local_part_variants.insert(pos + i, '.')
new_email = ''.join(local_part_variants).strip('.') + '@' + domain
# Check if the email already exists in the file
if not email_exists_in_file(new_email):
save_email_to_file(new_email)
return new_email
raise ValueError("Unable to generate a unique Gmail variation after multiple attempts.")
def extract_verify_link(text):
pattern = r'https://app\.gptinf\.com/signup/[^"]+'
matches = re.findall(pattern, text)
return matches[0] if matches else None
def get_gmail_service(creds):
return build('gmail', 'v1', credentials=creds)
def search_for_confirmation_email(service, query='subject:Confirm your email - GPTinf'):
try:
results = service.users().messages().list(userId='me', q=query, maxResults=1).execute()
messages = results.get('messages', [])
if not messages:
print("No confirmation email found.")
return None
# Get the first (most recent) message
message = service.users().messages().get(userId='me', id=messages[0]['id'], format='full').execute()
return message
except Exception as e:
print(f"Error retrieving confirmation email: {e}")
return None
def get_message_body(message):
if 'parts' in message['payload']:
# Iterate through all parts of the email
for part in message['payload']['parts']:
# Check if the part is HTML
if part['mimeType'] == 'text/html':
msg_data = part['body'].get('data', '')
decoded_data = base64.urlsafe_b64decode(msg_data).decode('utf-8')
print(f"Extracted HTML part: {decoded_data[:500]}")
return decoded_data
# Fallback to plain text if no HTML part found
elif part['mimeType'] == 'text/plain':
msg_data = part['body'].get('data', '')
decoded_data = base64.urlsafe_b64decode(msg_data).decode('utf-8')
print(f"Extracted plain text part: {decoded_data[:500]}")
return decoded_data
else:
# Handle case where the message does not have parts (e.g., it's a single text/plain or text/html email)
msg_data = message['payload']['body'].get('data', '')
decoded_data = base64.urlsafe_b64decode(msg_data).decode('utf-8')
print(f"Extracted single body message: {decoded_data[:500]}")
return decoded_data
return ''
def wait_for_confirmation_email(service, max_wait_time=300, poll_interval=10):
time.sleep(7)
start_time = time.time()
while time.time() - start_time < max_wait_time:
confirmation_email = search_for_confirmation_email(service)
if confirmation_email:
print("Confirmation email found!")
return confirmation_email
print(f"Waiting for the confirmation email... polling again in {poll_interval} seconds.")
time.sleep(poll_interval) # Wait for the next poll
print(f"Timeout: Confirmation email not received within {max_wait_time} seconds.")
return None