Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 102 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ AI Agents controlling Kerbal Space Program.
- **Kerbal Space Program** installed
- **Python 3.8+** installed
- **Poetry** for dependency management
- **OpenAI API key** for the language models

## Setup
## Installation

### 1. Install Poetry (if not already installed)
```bash
Expand All @@ -32,31 +33,117 @@ cd KOSMOS
poetry install
```

### 4. Usage
## Getting Started

KOSMOS uses OpenAI's GPT-4 as the language model. You need to have an OpenAI API key to use KOSMOS. You can get one from [here](https://platform.openai.com/api-keys).

After the installation process, you can run KOSMOS by:

```python
from kosmos import Kosmos

openai_api_key = "YOUR_API_KEY"

kosmos = Kosmos(
openai_api_key=openai_api_key,
)

# Start mission execution
kosmos.learn()
```

If you are running KOSMOS for the first time, make sure to:
1. **Launch Kerbal Space Program**
2. **Load a scenario or flight** (make sure you have an active vessel)
3. **Start the kRPC server** in KSP (usually via the kRPC menu)
4. **Run the Python script:**
4. **Run the Python script**

## Usage Examples

### Basic Mission Execution
```python
from kosmos import Kosmos

kosmos = Kosmos(
openai_api_key="YOUR_API_KEY",
)

# Start lifelong learning and mission execution
kosmos.learn()
```

### Resume from Checkpoint
If you stop the learning process and want to resume from a checkpoint later:

```python
from kosmos import Kosmos

kosmos = Kosmos(
openai_api_key="YOUR_API_KEY",
checkpoint_dir="YOUR_CKPT_DIR",
resume=True,
)

# Resume mission from checkpoint
kosmos.learn()
```

### Execute Specific Task
If you want to run KOSMOS for a specific task:

```python
from kosmos import Kosmos

# First instantiate KOSMOS with maneuver library
kosmos = Kosmos(
openai_api_key="YOUR_API_KEY",
maneuver_library_dir="./maneuver_library/trial1", # Load learned maneuvers
checkpoint_dir="YOUR_CKPT_DIR",
resume=False,
)

# Run task decomposition
task = "Launch to orbit" # e.g. "Launch to orbit", "Land on Mun", "Dock with space station"
sub_goals = kosmos.decompose_task(task=task)

# Execute the sub-goals
kosmos.execute_task(sub_goals=sub_goals)
```

## Quick Start

1. **Set your OpenAI API key:**
```bash
export OPENAI_API_KEY="your-api-key-here"
```

2. **Launch KSP and start kRPC server**

3. **Run KOSMOS:**
```bash
poetry run python main.py
```

## Project Structure
- `main.py` - Basic vessel position streaming example
- `pyproject.toml` - Poetry configuration and dependencies
- `poetry.lock` - Locked dependency versions
- `main.py` - Main entry point
- `kosmos/` - Core KOSMOS package
- `agents/` - AI agents (Flight, Mission Control, Maneuver, Audit)
- `env/` - KSP environment interface
- `utils/` - Utility functions
- `control_primitives/` - Executable maneuver implementations
- `control_primitives_context/` - Context examples for AI agents
- `examples/` - Usage examples
- `checkpoint/` - Checkpoint directory (created automatically)

## Example Output
The basic script will stream your vessel's position coordinates in real-time:
```
0.5.4
(2688577.068673832, -7.589481473858227, 465412.3802019775)
(2688577.068673832, -7.589481473858227, 465412.3802019775)
...
```
## Available Agents

- **FlightAgent** ✅ - Executes flight maneuvers and controls vessels
- **MissionControlAgent** ⚠️ - Plans and coordinates mission phases
- **ManeuverAgent** ⚠️ - Manages specific maneuver execution
- **AuditAgent** ⚠️ - Monitors and validates mission progress

## Development

To add new dependencies:
```bash
poetry add package-name
Expand All @@ -74,4 +161,4 @@ poetry run python main.py
4. Submit a pull request

## License
See [LICENSE](LICENSE) file for details.
See [LICENSE](LICENSE) file for details.
1 change: 1 addition & 0 deletions kosmos/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .kosmos import Kosmos
2 changes: 1 addition & 1 deletion kosmos/agents/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_vessel_telemetry(self, env=None):

def construct_system_message(self, maneuvers=[]):
print(f"🔍 DEBUG: FlightAgent constructing system message with {len(maneuvers)} maneuvers")
system_template = load_prompt("new_flight_template")
system_template = load_prompt("flight_template")
# Load MechJeb documentation
mechjeb_docs = load_prompt("mechjeb_readmellm")
# Load kRPC documentation
Expand Down
17 changes: 17 additions & 0 deletions kosmos/control_primitives/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import importlib.util
import os
import kosmos.utils as U

def load_control_primitives(primitive_names=None):
package_path = importlib.util.find_spec("kosmos").submodule_search_locations[0]
if primitive_names is None:
primitive_names = [
primitive[:-3]
for primitive in os.listdir(f"{package_path}/control_primitives")
if primitive.endswith(".py") and primitive != "__init__.py"
]
primitives = [
U.load_text(f"{package_path}/control_primitives/{primitive_name}.py")
for primitive_name in primitive_names
]
return primitives
80 changes: 80 additions & 0 deletions kosmos/control_primitives/dock_with_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
async def dock_with_target(conn, approach_distance=50, final_approach_speed=0.5):
"""
Dock with target vessel (similar to complex multi-step operations)
"""
global _dock_fail_count

vessel = conn.space_center.active_vessel
control = vessel.control
space_center = conn.space_center

if not space_center.target_vessel:
print("No target vessel selected for docking")
return False

target = space_center.target_vessel

try:
# Phase 1: Close approach
control.rcs = True
control.sas = True
control.sas_mode = space_center.SASMode.target

# Get close to target
while True:
target_distance = vessel.orbital_reference_frame.position(target).magnitude

if target_distance < approach_distance:
break

# Thrust toward target
if target_distance > 1000:
control.throttle = 0.1
elif target_distance > 100:
control.throttle = 0.05
else:
control.throttle = 0.01

time.sleep(0.5)

control.throttle = 0.0
print(f"Close approach achieved: {target_distance:.1f}m")

# Phase 2: Final docking approach
control.sas_mode = space_center.SASMode.stability_assist

# Find docking ports
my_port = find_docking_port(vessel)
target_port = find_docking_port(target)

if not my_port or not target_port:
print("Could not find docking ports on vessels")
return False

# Approach target port
while not vessel.parts.docking_ports[0].state.name == 'docked':
# Simple approach logic
time.sleep(1)

# In real implementation, this would use precise navigation
# This is simplified for the primitive example

print("Docking successful!")
control.rcs = False
return True

except Exception as err:
control.throttle = 0.0
control.rcs = False
print(f"Docking failed: {err}")
_dock_fail_count += 1
if _dock_fail_count > 3:
raise Exception("Docking failed too many times")
return False

def find_docking_port(vessel):
"""Find available docking port on vessel"""
for port in vessel.parts.docking_ports:
if port.state.name == 'ready':
return port
return None
64 changes: 64 additions & 0 deletions kosmos/control_primitives/execute_maneuver_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
async def execute_maneuver_node(conn, node=None, tolerance=1.0):
"""
Execute a maneuver node (similar to smeltItem - complex process with validation)
"""
global _burn_fail_count

vessel = conn.space_center.active_vessel
control = vessel.control

if node is None:
if not control.nodes:
print("No maneuver nodes to execute")
return False
node = control.nodes[0]

try:
# Calculate burn time
isp = vessel.specific_impulse * 9.82 # Convert to m/s
mass = vessel.mass
delta_v = node.delta_v
burn_time = mass * isp * (1 - math.exp(-delta_v / isp)) / vessel.available_thrust

# Wait until burn time
burn_ut = node.ut - (burn_time / 2)
lead_time = 5 # Start orienting 5 seconds early

# Warp to maneuver time
conn.space_center.warp_to(burn_ut - lead_time)

# Orient to maneuver
control.sas = True
control.sas_mode = conn.space_center.SASMode.maneuver

# Wait for orientation
time.sleep(3)

# Execute burn
remaining_delta_v = node.remaining_delta_v
control.throttle = 1.0

while remaining_delta_v > tolerance:
remaining_delta_v = node.remaining_delta_v

# Throttle down as we approach target
if remaining_delta_v < 10:
control.throttle = 0.1
elif remaining_delta_v < 50:
control.throttle = 0.5

time.sleep(0.1)

control.throttle = 0.0
node.remove()

print(f"Maneuver executed successfully, {remaining_delta_v:.1f} m/s remaining")
return True

except Exception as err:
control.throttle = 0.0
print(f"Maneuver execution failed: {err}")
_burn_fail_count += 1
if _burn_fail_count > 5:
raise Exception("Maneuver execution failed too many times")
return False
32 changes: 32 additions & 0 deletions kosmos/control_primitives/explore_until_body_found.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
async def explore_until_body_found(conn, target_body_name, max_time=3600):
"""
Explore space until target body is found (similar to exploreUntil)
"""
space_center = conn.space_center
vessel = space_center.active_vessel

# Check if we can already see the target
if target_body_name in space_center.bodies:
target_body = space_center.bodies[target_body_name]
print(f"Found {target_body_name}")
return target_body

start_time = time.time()

print(f"Exploring space to find {target_body_name}...")

# Simple exploration - change orbital plane and look around
while time.time() - start_time < max_time:
# In real implementation, this would use proper orbital mechanics
# to search for celestial bodies

# Check if we found the target
if target_body_name in space_center.bodies:
target_body = space_center.bodies[target_body_name]
print(f"Found {target_body_name} after {time.time() - start_time:.1f} seconds")
return target_body

time.sleep(10) # Search interval

print(f"Could not find {target_body_name} within {max_time} seconds")
return None
Loading