-
Notifications
You must be signed in to change notification settings - Fork 1k
/
main.py
2235 lines (1915 loc) · 98.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
from dotenv import load_dotenv
import json
from tavily import TavilyClient
import base64
from PIL import Image
import io
import re
from anthropic import Anthropic, APIStatusError, APIError
import difflib
import time
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.markdown import Markdown
import asyncio
from prompt_toolkit import PromptSession
from prompt_toolkit.styles import Style
import glob
import speech_recognition as sr
import websockets
from pydub import AudioSegment
from pydub.playback import play
import datetime
import venv
import sys
import signal
import logging
from typing import Tuple, Optional, Dict, Any
import mimetypes
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
import subprocess
import shutil
from typing import AsyncIterable
# Configure logging
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')
# Load environment variables from .env file
load_dotenv()
# Define a list of voice commands
VOICE_COMMANDS = {
"exit voice mode": "exit_voice_mode",
"save chat": "save_chat",
"reset conversation": "reset_conversation"
}
# Initialize recognizer and microphone as None
recognizer = None
microphone = None
# 11 Labs TTS
tts_enabled = True
use_tts = False
ELEVEN_LABS_API_KEY = os.getenv('ELEVEN_LABS_API_KEY')
VOICE_ID = 'YOUR VOICE ID'
MODEL_ID = 'eleven_turbo_v2_5'
def is_installed(lib_name):
return shutil.which(lib_name) is not None
async def text_chunker(text: str) -> AsyncIterable[str]:
"""Split text into chunks, ensuring to not break sentences."""
splitters = (".", ",", "?", "!", ";", ":", "—", "-", "(", ")", "[", "]", "}", " ")
buffer = ""
for char in text:
if buffer.endswith(splitters):
yield buffer + " "
buffer = char
elif char in splitters:
yield buffer + char + " "
buffer = ""
else:
buffer += char
if buffer:
yield buffer + " "
async def stream_audio(audio_stream):
"""Stream audio data using mpv player."""
if not is_installed("mpv"):
console.print("mpv not found. Installing alternative audio playback...", style="bold yellow")
# Fall back to pydub playback if mpv is not available
audio_data = b''.join([chunk async for chunk in audio_stream])
audio = AudioSegment.from_mp3(io.BytesIO(audio_data))
play(audio)
return
mpv_process = subprocess.Popen(
["mpv", "--no-cache", "--no-terminal", "--", "fd://0"],
stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
console.print("Started streaming audio", style="bold green")
try:
async for chunk in audio_stream:
if chunk:
mpv_process.stdin.write(chunk)
mpv_process.stdin.flush()
except Exception as e:
console.print(f"Error during audio streaming: {str(e)}", style="bold red")
finally:
if mpv_process.stdin:
mpv_process.stdin.close()
mpv_process.wait()
async def text_to_speech(text):
if not ELEVEN_LABS_API_KEY:
console.print("ElevenLabs API key not found. Text-to-speech is disabled.", style="bold yellow")
console.print(text)
return
uri = f"wss://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream-input?model_id={MODEL_ID}"
try:
async with websockets.connect(uri, extra_headers={'xi-api-key': ELEVEN_LABS_API_KEY}) as websocket:
# Send initial message
await websocket.send(json.dumps({
"text": " ",
"voice_settings": {"stability": 0.5, "similarity_boost": 0.75},
"xi_api_key": ELEVEN_LABS_API_KEY,
}))
# Set up listener for audio chunks
async def listen():
while True:
try:
message = await websocket.recv()
data = json.loads(message)
if data.get("audio"):
yield base64.b64decode(data["audio"])
elif data.get('isFinal'):
break
except websockets.exceptions.ConnectionClosed:
logging.error("WebSocket connection closed unexpectedly")
break
except Exception as e:
logging.error(f"Error processing audio message: {str(e)}")
break
# Start audio streaming task
stream_task = asyncio.create_task(stream_audio(listen()))
# Send text in chunks
async for chunk in text_chunker(text):
try:
await websocket.send(json.dumps({"text": chunk, "try_trigger_generation": True}))
except Exception as e:
logging.error(f"Error sending text chunk: {str(e)}")
break
# Send closing message
await websocket.send(json.dumps({"text": ""}))
# Wait for streaming to complete
await stream_task
except websockets.exceptions.InvalidStatusCode as e:
logging.error(f"Failed to connect to ElevenLabs API: {e}")
console.print(f"Failed to connect to ElevenLabs API: {e}", style="bold red")
console.print("Fallback: Printing the text instead.", style="bold yellow")
console.print(text)
except Exception as e:
logging.error(f"Error in text-to-speech: {str(e)}")
console.print(f"Error in text-to-speech: {str(e)}", style="bold red")
console.print("Fallback: Printing the text instead.", style="bold yellow")
console.print(text)
def initialize_speech_recognition():
global recognizer, microphone
recognizer = sr.Recognizer()
microphone = sr.Microphone()
# Adjust for ambient noise
with microphone as source:
recognizer.adjust_for_ambient_noise(source, duration=1)
logging.info("Speech recognition initialized")
async def voice_input(max_retries=3):
global recognizer, microphone
for attempt in range(max_retries):
# Reinitialize speech recognition objects before each attempt
initialize_speech_recognition()
try:
with microphone as source:
console.print("Listening... Speak now.", style="bold green")
audio = recognizer.listen(source, timeout=5)
console.print("Processing speech...", style="bold yellow")
text = recognizer.recognize_google(audio)
console.print(f"You said: {text}", style="cyan")
return text.lower()
except sr.WaitTimeoutError:
console.print(f"No speech detected. Attempt {attempt + 1} of {max_retries}.", style="bold red")
logging.warning(f"No speech detected. Attempt {attempt + 1} of {max_retries}")
except sr.UnknownValueError:
console.print(f"Speech was unintelligible. Attempt {attempt + 1} of {max_retries}.", style="bold red")
logging.warning(f"Speech was unintelligible. Attempt {attempt + 1} of {max_retries}")
except sr.RequestError as e:
console.print(f"Could not request results from speech recognition service; {e}", style="bold red")
logging.error(f"Could not request results from speech recognition service; {e}")
return None
except Exception as e:
console.print(f"Unexpected error in voice input: {str(e)}", style="bold red")
logging.error(f"Unexpected error in voice input: {str(e)}")
return None
# Add a short delay between attempts
await asyncio.sleep(1)
console.print("Max retries reached. Returning to text input mode.", style="bold red")
logging.info("Max retries reached in voice input. Returning to text input mode.")
return None
def cleanup_speech_recognition():
global recognizer, microphone
recognizer = None
microphone = None
logging.info('Speech recognition objects cleaned up')
def process_voice_command(command):
if command in VOICE_COMMANDS:
action = VOICE_COMMANDS[command]
if action == "exit_voice_mode":
return False, "Exiting voice mode."
elif action == "save_chat":
filename = save_chat()
return True, f"Chat saved to {filename}"
elif action == "reset_conversation":
reset_conversation()
return True, "Conversation has been reset."
return True, None
async def get_user_input(prompt="You: "):
style = Style.from_dict({
'prompt': 'cyan bold',
})
session = PromptSession(style=style)
return await session.prompt_async(prompt, multiline=False)
def setup_virtual_environment() -> Tuple[str, str]:
venv_name = "code_execution_env"
venv_path = os.path.join(os.getcwd(), venv_name)
try:
if not os.path.exists(venv_path):
venv.create(venv_path, with_pip=True)
# Activate the virtual environment
if sys.platform == "win32":
activate_script = os.path.join(venv_path, "Scripts", "activate.bat")
else:
activate_script = os.path.join(venv_path, "bin", "activate")
return venv_path, activate_script
except Exception as e:
logging.error(f"Error setting up virtual environment: {str(e)}")
raise
# Initialize the Anthropic client
anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
if not anthropic_api_key:
raise ValueError("ANTHROPIC_API_KEY not found in environment variables")
client = Anthropic(api_key=anthropic_api_key)
# Initialize the Tavily client
tavily_api_key = os.getenv("TAVILY_API_KEY")
if not tavily_api_key:
raise ValueError("TAVILY_API_KEY not found in environment variables")
tavily = TavilyClient(api_key=tavily_api_key)
console = Console()
# Token tracking variables
main_model_tokens = {'input': 0, 'output': 0, 'cache_write': 0, 'cache_read': 0}
tool_checker_tokens = {'input': 0, 'output': 0, 'cache_write': 0, 'cache_read': 0}
code_editor_tokens = {'input': 0, 'output': 0, 'cache_write': 0, 'cache_read': 0}
code_execution_tokens = {'input': 0, 'output': 0, 'cache_write': 0, 'cache_read': 0}
USE_FUZZY_SEARCH = True
# Set up the conversation memory (maintains context for MAINMODEL)
conversation_history = []
# Store file contents (part of the context for MAINMODEL)
file_contents = {}
# Code editor memory (maintains some context for CODEEDITORMODEL between calls)
code_editor_memory = []
# Files already present in code editor's context
code_editor_files = set()
# automode flag
automode = False
# Global dictionary to store running processes
running_processes = {}
# Constants
CONTINUATION_EXIT_PHRASE = "AUTOMODE_COMPLETE"
MAX_CONTINUATION_ITERATIONS = 25
MAX_CONTEXT_TOKENS = 200000 # Reduced to 200k tokens for context window
MAINMODEL = "claude-3-5-sonnet-20241022"
TOOLCHECKERMODEL = "claude-3-5-sonnet-20241022"
CODEEDITORMODEL = "claude-3-5-sonnet-20241022"
CODEEXECUTIONMODEL = "claude-3-5-sonnet-20241022"
# System prompts
BASE_SYSTEM_PROMPT = """
You are Claude, an AI assistant powered by Anthropic's Claude-3.5-Sonnet model, specialized in software development with access to a variety of tools and the ability to instruct and direct a coding agent and a code execution one. Your capabilities include:
<capabilities>
1. Creating and managing project structures
2. Writing, debugging, and improving code across multiple languages
3. Providing architectural insights and applying design patterns
4. Staying current with the latest technologies and best practices
5. Analyzing and manipulating files within the project directory
6. Performing web searches for up-to-date information
7. Executing code and analyzing its output within an isolated 'code_execution_env' virtual environment
8. Managing and stopping running processes started within the 'code_execution_env'
9. Running shell commands.
</capabilities>
Available tools and their optimal use cases:
<tools>
1. create_folders: Create new folders at the specified paths, including nested directories. Use this to create one or more directories in the project structure, even complex nested structures in a single operation.
2. create_files: Generate one or more new files with specified content. Strive to make the files as complete and useful as possible.
3. edit_and_apply_multiple: Examine and modify one or more existing files by instructing a separate AI coding agent. You are responsible for providing clear, detailed instructions for each file. When using this tool:
- Provide comprehensive context about the project, including recent changes, new variables or functions, and how files are interconnected.
- Clearly state the specific changes or improvements needed for each file, explaining the reasoning behind each modification.
- Include ALL the snippets of code to change, along with the desired modifications.
- Specify coding standards, naming conventions, or architectural patterns to be followed.
- Anticipate potential issues or conflicts that might arise from the changes and provide guidance on how to handle them.
- IMPORTANT: Always provide the input in the following format:
{
"files": [
{
"path": "app/templates/base.html",
"instructions": "Update the navigation bar for better UX."
},
{
"path": "app/routes.py",
"instructions": "Refactor the route handling for scalability."
}
],
"project_context": "Overall context about the project and desired changes."
}
- Ensure that the "files" key contains a list of dictionaries, even if you're only editing one file.
- Always include the "project_context" key with relevant information.
4. execute_code: Run Python code exclusively in the 'code_execution_env' virtual environment and analyze its output. Use this when you need to test code functionality or diagnose issues. Remember that all code execution happens in this isolated environment. This tool returns a process ID for long-running processes.
5. stop_process: Stop a running process by its ID. Use this when you need to terminate a long-running process started by the execute_code tool.
6. read_multiple_files: Read the contents of one or more existing files, supporting wildcards (e.g., '*.py') and recursive directory reading. This tool can handle single or multiple file paths, directory paths, and wildcard patterns. Use this when you need to examine or work with file contents, especially for multiple files or entire directories.
IMPORTANT: Before using the read_multiple_files tool, always check if the files you need are already in your context (system prompt).
If the file contents are already available to you, use that information directly instead of calling the read_multiple_files tool.
Only use the read_multiple_files tool for files that are not already in your context.
7. list_files: List all files and directories in a specified folder.
8. tavily_search: Perform a web search using the Tavily API for up-to-date information.
9. scan_folder: Scan a specified folder and create a Markdown file with the contents of all coding text files, excluding binary files and common ignored folders. Use this tool to generate comprehensive documentation of project structures.
10. run_shell_command: Execute a shell command and return its output. Use this tool when you need to run system commands or interact with the operating system. Ensure the command is safe and appropriate for the current operating system.
IMPORTANT: Use this tool to install dependencies in the code_execution_env when using the execute_code tool.
</tools>
<tool_usage_guidelines>
Tool Usage Guidelines:
- Always use the most appropriate tool for the task at hand.
- Provide detailed and clear instructions when using tools, especially for edit_and_apply_multiple.
- After making changes, always review the output to ensure accuracy and alignment with intentions.
- Use execute_code to run and test code within the 'code_execution_env' virtual environment, then analyze the results.
- For long-running processes, use the process ID returned by execute_code to stop them later if needed.
- Proactively use tavily_search when you need up-to-date information or additional context.
- When working with files, use read_multiple_files for both single and multiple file read making sure that the files are not already in your context.
</tool_usage_guidelines>
<error_handling>
Error Handling and Recovery:
- If a tool operation fails, carefully analyze the error message and attempt to resolve the issue.
- For file-related errors, double-check file paths and permissions before retrying.
- If a search fails, try rephrasing the query or breaking it into smaller, more specific searches.
- If code execution fails, analyze the error output and suggest potential fixes, considering the isolated nature of the environment.
- If a process fails to stop, consider potential reasons and suggest alternative approaches.
</error_handling>
<project_management>
Project Creation and Management:
1. Start by creating a root folder for new projects.
2. Create necessary subdirectories and files within the root folder.
3. Organize the project structure logically, following best practices for the specific project type.
</project_management>
Always strive for accuracy, clarity, and efficiency in your responses and actions. Your instructions must be precise and comprehensive. If uncertain, use the tavily_search tool or admit your limitations. When executing code, always remember that it runs in the isolated 'code_execution_env' virtual environment. Be aware of any long-running processes you start and manage them appropriately, including stopping them when they are no longer needed.
<tool_usage_best_practices>
When using tools:
1. Carefully consider if a tool is necessary before using it.
2. Ensure all required parameters are provided and valid.
3. When using edit_and_apply_multiple, always structure your input as a dictionary with "files" (a list of file dictionaries) and "project_context" keys.
4. Handle both successful results and errors gracefully.
5. Provide clear explanations of tool usage and results to the user.
</tool_usage_best_practices>
Remember, you are an AI assistant, and your primary goal is to help the user accomplish their tasks effectively and efficiently while maintaining the integrity and security of their development environment.
"""
AUTOMODE_SYSTEM_PROMPT = """
You are currently in automode. Follow these guidelines:
<goal_setting>
1. Goal Setting:
- Set clear, achievable goals based on the user's request.
- Break down complex tasks into smaller, manageable goals.
</goal_setting>
<goal_execution>
2. Goal Execution:
- Work through goals systematically, using appropriate tools for each task.
- Utilize file operations, code writing, and web searches as needed.
- Always read a file before editing and review changes after editing.
</goal_execution>
<progress_tracking>
3. Progress Tracking:
- Provide regular updates on goal completion and overall progress.
- Use the iteration information to pace your work effectively.
</progress_tracking>
<task_breakdown>
Break Down Complex Tasks:
When faced with a complex task or project, break it down into smaller, manageable steps. Provide a clear outline of the steps involved, potential challenges, and how to approach each part of the task.
</task_breakdown>
<explanation_preference>
Prefer Answering Without Code:
When explaining concepts or providing solutions, prioritize clear explanations and pseudocode over full code implementations. Only provide full code snippets when explicitly requested or when it's essential for understanding.
</explanation_preference>
<code_review_process>
Code Review Process:
When reviewing code, follow these steps:
1. Understand the context and purpose of the code
2. Check for clarity and readability
3. Identify potential bugs or errors
4. Suggest optimizations or improvements
5. Ensure adherence to best practices and coding standards
6. Consider security implications
7. Provide constructive feedback with explanations
</code_review_process>
<project_planning>
Project Planning:
When planning a project, consider the following:
1. Define clear project goals and objectives
2. Break down the project into manageable tasks and subtasks
3. Estimate time and resources required for each task
4. Identify potential risks and mitigation strategies
5. Suggest appropriate tools and technologies
6. Outline a testing and quality assurance strategy
7. Consider scalability and future maintenance
</project_planning>
<security_review>
Security Review:
When conducting a security review, focus on:
1. Identifying potential vulnerabilities in the code
2. Checking for proper input validation and sanitization
3. Ensuring secure handling of sensitive data
4. Reviewing authentication and authorization mechanisms
5. Checking for secure communication protocols
6. Identifying any use of deprecated or insecure functions
7. Suggesting security best practices and improvements
</security_review>
Remember to apply these additional skills and processes when assisting users with their software development tasks and projects.
<tool_usage>
4. Tool Usage:
- Leverage all available tools to accomplish your goals efficiently.
- Prefer edit_and_apply_multiple for file modifications, applying changes in chunks for large edits.
- Use tavily_search proactively for up-to-date information.
</tool_usage>
<error_handling>
5. Error Handling:
- If a tool operation fails, analyze the error and attempt to resolve the issue.
- For persistent errors, consider alternative approaches to achieve the goal.
</error_handling>
<automode_completion>
6. Automode Completion:
- When all goals are completed, respond with "AUTOMODE_COMPLETE" to exit automode.
- Do not ask for additional tasks or modifications once goals are achieved.
</automode_completion>
<iteration_awareness>
7. Iteration Awareness:
- You have access to this {iteration_info}.
- Use this information to prioritize tasks and manage time effectively.
</iteration_awareness>
Remember: Focus on completing the established goals efficiently and effectively. Avoid unnecessary conversations or requests for additional tasks.
"""
def update_system_prompt(current_iteration: Optional[int] = None, max_iterations: Optional[int] = None) -> str:
global file_contents
chain_of_thought_prompt = """
Answer the user's request using relevant tools (if they are available). Before calling a tool, do some analysis within <thinking></thinking> tags. First, think about which of the provided tools is the relevant tool to answer the user's request. Second, go through each of the required parameters of the relevant tool and determine if the user has directly provided or given enough information to infer a value. When deciding if the parameter can be inferred, carefully consider all the context to see if it supports a specific value. If all of the required parameters are present or can be reasonably inferred, close the thinking tag and proceed with the tool call. BUT, if one of the values for a required parameter is missing, DO NOT invoke the function (not even with fillers for the missing params) and instead, ask the user to provide the missing parameters. DO NOT ask for more information on optional parameters if it is not provided.
Do not reflect on the quality of the returned search results in your response.
IMPORTANT: Before using the read_multiple_files tool, always check if the files you need are already in your context (system prompt).
If the file contents are already available to you, use that information directly instead of calling the read_multiple_files tool.
Only use the read_multiple_files tool for files that are not already in your context.
When instructing to read a file, always use the full file path.
"""
files_in_context = "\n".join(file_contents.keys())
file_contents_prompt = f"\n\nFiles already in your context:\n{files_in_context}\n\nFile Contents:\n"
for path, content in file_contents.items():
file_contents_prompt += f"\n--- {path} ---\n{content}\n"
if automode:
iteration_info = ""
if current_iteration is not None and max_iterations is not None:
iteration_info = f"You are currently on iteration {current_iteration} out of {max_iterations} in automode."
return BASE_SYSTEM_PROMPT + file_contents_prompt + "\n\n" + AUTOMODE_SYSTEM_PROMPT.format(iteration_info=iteration_info) + "\n\n" + chain_of_thought_prompt
else:
return BASE_SYSTEM_PROMPT + file_contents_prompt + "\n\n" + chain_of_thought_prompt
def create_folders(paths):
results = []
for path in paths:
try:
# Use os.makedirs with exist_ok=True to create nested directories
os.makedirs(path, exist_ok=True)
results.append(f"Folder(s) created: {path}")
except Exception as e:
results.append(f"Error creating folder(s) {path}: {str(e)}")
return "\n".join(results)
def create_files(files):
global file_contents
results = []
# Handle different input types
if isinstance(files, str):
# If a string is passed, assume it's a single file path
files = [{"path": files, "content": ""}]
elif isinstance(files, dict):
# If a single dictionary is passed, wrap it in a list
files = [files]
elif not isinstance(files, list):
return "Error: Invalid input type for create_files. Expected string, dict, or list."
for file in files:
try:
if not isinstance(file, dict):
results.append(f"Error: Invalid file specification: {file}")
continue
path = file.get('path')
content = file.get('content', '')
if path is None:
results.append(f"Error: Missing 'path' for file")
continue
dir_name = os.path.dirname(path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)
with open(path, 'w') as f:
f.write(content)
file_contents[path] = content
results.append(f"File created and added to system prompt: {path}")
except Exception as e:
results.append(f"Error creating file: {str(e)}")
return "\n".join(results)
async def generate_edit_instructions(file_path, file_content, instructions, project_context, full_file_contents):
global code_editor_tokens, code_editor_memory, code_editor_files
try:
# Prepare memory context (maintains some context between calls)
memory_context = "\n".join([f"Memory {i+1}:\n{mem}" for i, mem in enumerate(code_editor_memory)])
# Prepare full file contents context, excluding the file being edited if it's already in code_editor_files
full_file_contents_context = "\n\n".join([
f"--- {path} ---\n{content}" for path, content in full_file_contents.items()
if path != file_path or path not in code_editor_files
])
system_prompt = f"""
You are an expert coding assistant specializing in web development (CSS, JavaScript, React, Tailwind, Node.JS, Hugo/Markdown). Review the following information carefully:
1. File Content:
{file_content}
2. Edit Instructions:
{instructions}
3. Project Context:
{project_context}
4. Previous Edit Memory:
{memory_context}
5. Full Project Files Context:
{full_file_contents_context}
Follow this process to generate edit instructions:
1. <CODE_REVIEW>
Analyze the existing code thoroughly. Describe how it works, identifying key components,
dependencies, and potential issues. Consider the broader project context and previous edits.
</CODE_REVIEW>
2. <PLANNING>
Construct a plan to implement the requested changes. Consider:
- How to avoid code duplication (DRY principle)
- Balance between maintenance and flexibility
- Relevant frameworks or libraries
- Security implications
- Performance impacts
Outline discrete changes and suggest small tests for each stage.
</PLANNING>
3. Finally, generate SEARCH/REPLACE blocks for each necessary change:
- Use enough context to uniquely identify the code to be changed
- Maintain correct indentation and formatting
- Focus on specific, targeted changes
- Ensure consistency with project context and previous edits
USE THIS FORMAT FOR CHANGES:
<SEARCH>
Code to be replaced (with sufficient context)
</SEARCH>
<REPLACE>
New code to insert
</REPLACE>
IMPORTANT: ONLY RETURN CODE INSIDE THE <SEARCH> AND <REPLACE> TAGS. DO NOT INCLUDE ANY OTHER TEXT, COMMENTS, or Explanations. FOR EXAMPLE:
<SEARCH>
def old_function():
pass
</SEARCH>
<REPLACE>
def new_function():
print("New Functionality")
</REPLACE>
"""
response = client.beta.prompt_caching.messages.create(
model=CODEEDITORMODEL,
max_tokens=8000,
system=[
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"}
}
],
messages=[
{"role": "user", "content": "Generate SEARCH/REPLACE blocks for the necessary changes."}
],
extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"}
)
# Update token usage for code editor
code_editor_tokens['input'] += response.usage.input_tokens
code_editor_tokens['output'] += response.usage.output_tokens
code_editor_tokens['cache_write'] = response.usage.cache_creation_input_tokens
code_editor_tokens['cache_read'] = response.usage.cache_read_input_tokens
ai_response_text = response.content[0].text # Extract the text
# If ai_response_text is a list, handle it
if isinstance(ai_response_text, list):
ai_response_text = ' '.join(
item['text'] if isinstance(item, dict) and 'text' in item else str(item)
for item in ai_response_text
)
elif not isinstance(ai_response_text, str):
ai_response_text = str(ai_response_text)
# Validate AI response
try:
if not validate_ai_response(ai_response_text):
raise ValueError("AI response does not contain valid SEARCH/REPLACE blocks")
except ValueError as ve:
logging.error(f"Validation failed: {ve}")
return [] # Return empty list to indicate failure
# Parse the response to extract SEARCH/REPLACE blocks
edit_instructions = parse_search_replace_blocks(ai_response_text)
if not edit_instructions:
raise ValueError("No valid edit instructions were generated")
# Update code editor memory
code_editor_memory.append(f"Edit Instructions for {file_path}:\n{ai_response_text}")
# Add the file to code_editor_files set
code_editor_files.add(file_path)
return edit_instructions
except Exception as e:
console.print(f"Error in generating edit instructions: {str(e)}", style="bold red")
logging.error(f"Error in generating edit instructions: {str(e)}")
return [] # Return empty list if any exception occurs
def validate_ai_response(response_text):
if isinstance(response_text, list):
# Extract 'text' from each dictionary in the list
try:
response_text = ' '.join(
item['text'] if isinstance(item, dict) and 'text' in item else str(item)
for item in response_text
)
except Exception as e:
logging.error(f"Error processing response_text list: {str(e)}")
raise ValueError("Invalid format in response_text list.")
elif not isinstance(response_text, str):
logging.debug(f"validate_ai_response received type {type(response_text)}: {response_text}")
raise ValueError(f"Invalid type for response_text: {type(response_text)}. Expected string.")
# Log the processed response_text
logging.debug(f"Processed response_text for validation: {response_text}")
if not re.search(r'<SEARCH>.*?</SEARCH>', response_text, re.DOTALL):
raise ValueError("AI response does not contain any <SEARCH> blocks")
if not re.search(r'<REPLACE>.*?</REPLACE>', response_text, re.DOTALL):
raise ValueError("AI response does not contain any <REPLACE> blocks")
return True
def parse_search_replace_blocks(response_text, use_fuzzy=USE_FUZZY_SEARCH):
"""
Parse the response text for SEARCH/REPLACE blocks.
Args:
response_text (str): The text containing SEARCH/REPLACE blocks.
use_fuzzy (bool): Whether to use fuzzy matching for search blocks.
Returns:
list: A list of dictionaries, each containing 'search', 'replace', and 'similarity' keys.
"""
blocks = []
pattern = r'<SEARCH>\s*(.*?)\s*</SEARCH>\s*<REPLACE>\s*(.*?)\s*</REPLACE>'
matches = re.findall(pattern, response_text, re.DOTALL)
for search, replace in matches:
search = search.strip()
replace = replace.strip()
similarity = 1.0 # Default to exact match
if use_fuzzy and search not in response_text:
# Extract possible search targets from the response text
possible_search_targets = re.findall(r'<SEARCH>\s*(.*?)\s*</SEARCH>', response_text, re.DOTALL)
possible_search_targets = [target.strip() for target in possible_search_targets]
best_match = difflib.get_close_matches(search, possible_search_targets, n=1, cutoff=0.6)
if best_match:
similarity = difflib.SequenceMatcher(None, search, best_match[0]).ratio()
else:
similarity = 0.0
blocks.append({
'search': search,
'replace': replace,
'similarity': similarity
})
return blocks
async def edit_and_apply_multiple(files, project_context, is_automode=False):
global file_contents
results = []
console_outputs = []
logging.debug(f"edit_and_apply_multiple called with files: {files}")
logging.debug(f"Project context: {project_context}")
try:
files = validate_files_structure(files)
except ValueError as ve:
logging.error(f"Validation error: {ve}")
return [], f"Error: {ve}"
logging.info(f"Starting edit_and_apply_multiple with {len(files)} file(s)")
for file in files:
path = file['path']
instructions = file['instructions']
logging.info(f"Processing file: {path}")
try:
original_content = file_contents.get(path, "")
if not original_content:
logging.info(f"Reading content for file: {path}")
with open(path, 'r') as f:
original_content = f.read()
file_contents[path] = original_content
logging.info(f"Generating edit instructions for file: {path}")
edit_instructions = await generate_edit_instructions(path, original_content, instructions, project_context, file_contents)
logging.debug(f"AI response for {path}: {edit_instructions}")
if not isinstance(edit_instructions, list) or not all(isinstance(item, dict) for item in edit_instructions):
raise ValueError("Invalid edit_instructions format. Expected a list of dictionaries.")
if edit_instructions:
console.print(Panel(f"File: {path}\nThe following SEARCH/REPLACE blocks have been generated:", title="Edit Instructions", style="cyan"))
for i, block in enumerate(edit_instructions, 1):
console.print(f"Block {i}:")
console.print(Panel(f"SEARCH:\n{block['search']}\n\nREPLACE:\n{block['replace']}\nSimilarity: {block['similarity']:.2f}", expand=False))
logging.info(f"Applying edits to file: {path}")
edited_content, changes_made, failed_edits, console_output = await apply_edits(path, edit_instructions, original_content)
console_outputs.append(console_output)
if changes_made:
file_contents[path] = edited_content
console.print(Panel(f"File contents updated in system prompt: {path}", style="green"))
logging.info(f"Changes applied to file: {path}")
if failed_edits:
logging.warning(f"Some edits failed for file: {path}")
logging.debug(f"Failed edits for {path}: {failed_edits}")
results.append({
"path": path,
"status": "partial_success",
"message": f"Some changes applied to {path}, but some edits failed.",
"failed_edits": failed_edits,
"edited_content": edited_content
})
else:
results.append({
"path": path,
"status": "success",
"message": f"All changes successfully applied to {path}",
"edited_content": edited_content
})
else:
logging.warning(f"No changes applied to file: {path}")
results.append({
"path": path,
"status": "no_changes",
"message": f"No changes could be applied to {path}. Please review the edit instructions and try again."
})
else:
logging.warning(f"No edit instructions generated for file: {path}")
results.append({
"path": path,
"status": "no_instructions",
"message": f"No edit instructions generated for {path}"
})
except Exception as e:
logging.error(f"Error editing/applying to file {path}: {str(e)}")
logging.exception("Full traceback:")
error_message = f"Error editing/applying to file {path}: {str(e)}"
results.append({
"path": path,
"status": "error",
"message": error_message
})
console_outputs.append(error_message)
logging.info("Completed edit_and_apply_multiple")
logging.debug(f"Results: {results}")
return results, "\n".join(console_outputs)
async def apply_edits(file_path, edit_instructions, original_content):
changes_made = False
edited_content = original_content
total_edits = len(edit_instructions)
failed_edits = []
console_output = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
console=console
) as progress:
edit_task = progress.add_task("[cyan]Applying edits...", total=total_edits)
for i, edit in enumerate(edit_instructions, 1):
search_content = edit['search'].strip()
replace_content = edit['replace'].strip()
similarity = edit['similarity']
# Use regex to find the content, ignoring leading/trailing whitespace
pattern = re.compile(re.escape(search_content), re.DOTALL)
match = pattern.search(edited_content)
if match or (USE_FUZZY_SEARCH and similarity >= 0.8):
if not match:
# If using fuzzy search and no exact match, find the best match
best_match = difflib.get_close_matches(search_content, [edited_content], n=1, cutoff=0.6)
if best_match:
match = re.search(re.escape(best_match[0]), edited_content)
if match:
# Replace the content using re.sub for more robust replacement
replace_content_cleaned = re.sub(r'</?SEARCH>|</?REPLACE>', '', replace_content)
edited_content = pattern.sub(replace_content_cleaned, edited_content, count=1)
changes_made = True
# Display the diff for this edit
diff_result = generate_diff(search_content, replace_content, file_path)
console.print(Panel(diff_result, title=f"Changes in {file_path} ({i}/{total_edits}) - Similarity: {similarity:.2f}", style="cyan"))
console_output.append(f"Edit {i}/{total_edits} applied successfully")
else:
message = f"Edit {i}/{total_edits} not applied: content not found (Similarity: {similarity:.2f})"
console_output.append(message)
console.print(Panel(message, style="yellow"))
failed_edits.append(f"Edit {i}: {search_content}")
else:
message = f"Edit {i}/{total_edits} not applied: content not found (Similarity: {similarity:.2f})"
console_output.append(message)
console.print(Panel(message, style="yellow"))
failed_edits.append(f"Edit {i}: {search_content}")
progress.update(edit_task, advance=1)
if not changes_made:
message = "No changes were applied. The file content already matches the desired state."
console_output.append(message)
console.print(Panel(message, style="green"))
else:
# Write the changes to the file
with open(file_path, 'w') as file:
file.write(edited_content)
message = f"Changes have been written to {file_path}"
console_output.append(message)
console.print(Panel(message, style="green"))
return edited_content, changes_made, failed_edits, "\n".join(console_output)
def highlight_diff(diff_text):
return Syntax(diff_text, "diff", theme="monokai", line_numbers=True)
def generate_diff(original, new, path):
diff = list(difflib.unified_diff(
original.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{path}",
tofile=f"b/{path}",
n=3
))
diff_text = ''.join(diff)
highlighted_diff = highlight_diff(diff_text)
return highlighted_diff
async def execute_code(code, timeout=10):
global running_processes
venv_path, activate_script = setup_virtual_environment()
# Input validation
if not isinstance(code, str):
raise ValueError("code must be a string")
if not isinstance(timeout, (int, float)):
raise ValueError("timeout must be a number")
# Generate a unique identifier for this process
process_id = f"process_{len(running_processes)}"
# Write the code to a temporary file
try:
with open(f"{process_id}.py", "w") as f:
f.write(code)
except IOError as e:
return process_id, f"Error writing code to file: {str(e)}"
# Prepare the command to run the code
if sys.platform == "win32":