Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add single unit unload example bot #108

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ class MyBot(sc2.BotAI):
self.raw_affects_selection = True
```

### `enable_feature_layer`
Setting this to true allows interaction with the UI
```python
class MyBot(sc2.BotAI):
def __init__(self):
self.enable_feature_layer = True
```

### `distance_calculation_method`
The distance calculation method:
- 0 for raw python
Expand Down
123 changes: 123 additions & 0 deletions examples/protoss/single_unit_unload_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import sys, os

sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))

from typing import Union
from loguru import logger

import sc2
from sc2 import Race, Difficulty
from sc2.ids.unit_typeid import UnitTypeId
from sc2.player import Bot, Computer
from sc2.unit import Unit
from sc2.units import Units

from s2clientprotocol import raw_pb2 as raw_pb
from s2clientprotocol import sc2api_pb2 as sc_pb
from s2clientprotocol import ui_pb2 as ui_pb


class SingleUnitUnloadBot(sc2.BotAI):
def __init__(self):
self.raw_affects_selection = True
self.enable_feature_layer = True

async def on_start(self):
self.client.game_step = 8
self.load_unit_types = {
UnitTypeId.ZEALOT,
UnitTypeId.STALKER,
UnitTypeId.DARKTEMPLAR,
UnitTypeId.HIGHTEMPLAR,
}

async def unload_unit(self, transporter_unit: Unit, unload_unit: Union[int, Unit]):
assert isinstance(transporter_unit, Unit)
assert isinstance(unload_unit, (int, Unit))
assert hasattr(self, "raw_affects_selection") and self.raw_affects_selection is True
assert hasattr(self, "enable_feature_layer") and self.enable_feature_layer is True
if isinstance(unload_unit, Unit):
unload_unit_tag = unload_unit.tag
else:
unload_unit_tag = unload_unit

# TODO Change unit.py passengers to return a List[Unit] instead of Set[Unit] ? Then I don't have to loop over '._proto'
unload_unit_index = next(
(index for index, unit in enumerate(transporter_unit._proto.passengers) if unit.tag == unload_unit_tag),
None
)

if unload_unit_index is None:
logger.info(f"Unable to find unit {unload_unit} in transporter {transporter_unit}")
return

logger.info(f"Unloading unit at index: {unload_unit_index}")
await self.client._execute(
action=sc_pb.RequestAction(
actions=[
sc_pb.Action(
action_raw=raw_pb.ActionRaw(
unit_command=raw_pb.ActionRawUnitCommand(ability_id=0, unit_tags=[transporter_unit.tag])
)
),
sc_pb.Action(
action_ui=ui_pb.ActionUI(
cargo_panel=ui_pb.ActionCargoPanelUnload(unit_index=unload_unit_index)
)
),
]
)
)

async def on_step(self, iteration):
# Spawn units
logger.info(f"Spawning units")
await self.client.debug_create_unit(
[
[UnitTypeId.WARPPRISM, 1, self.game_info.map_center, 1],
[UnitTypeId.ZEALOT, 1, self.game_info.map_center, 1],
[UnitTypeId.STALKER, 1, self.game_info.map_center, 1],
[UnitTypeId.DARKTEMPLAR, 1, self.game_info.map_center, 1],
[UnitTypeId.HIGHTEMPLAR, 1, self.game_info.map_center, 1],
]
)
# Load units into prism
await self._advance_steps(50)
prism = self.units(UnitTypeId.WARPPRISM)[0]
my_zealot = self.units(UnitTypeId.ZEALOT)[0]
my_units = self.units(self.load_unit_types)
logger.info(f"Loading units into prism: {my_units}")
for unit in my_units:
unit.smart(prism)

# Unload single unit - here: zealot
await self._advance_steps(50)
assert self.units(self.load_unit_types).amount == 0
prism: Unit = self.units(UnitTypeId.WARPPRISM)[0]
await self.unload_unit(prism, my_zealot)
# Also works:
# await self.unload_unit(prism, my_zealot.tag)

await self._advance_steps(50)
my_units = self.units(self.load_unit_types)
assert my_units.amount == 1, f"{my_units}"
my_zealots = self.units(UnitTypeId.ZEALOT)
assert my_zealots.amount == 1, f"{my_zealots}"
assert my_zealots[0].tag == my_zealot.tag

logger.info("Everything ran as expected. Terminating.")
await self.client.leave()


def main():
sc2.run_game(
sc2.maps.get("2000AtmospheresAIE"),
[Bot(Race.Protoss, SingleUnitUnloadBot()),
Computer(Race.Terran, Difficulty.Medium)],
realtime=False,
save_replay_as="PvT.SC2Replay",
)


if __name__ == "__main__":
main()
3 changes: 1 addition & 2 deletions examples/zerg/expand_everywhere.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))

import numpy as np
from sc2.position import Point2, Point3

import sc2
Expand Down Expand Up @@ -76,7 +75,7 @@ async def on_building_construction_complete(self, unit: Unit):

def main():
sc2.run_game(
sc2.maps.get("AcropolisLE"),
sc2.maps.get("2000AtmospheresAIE"),
[Bot(Race.Zerg, ExpandEverywhere()), Computer(Race.Terran, Difficulty.Medium)],
realtime=False,
save_replay_as="ZvT.SC2Replay",
Expand Down
42 changes: 26 additions & 16 deletions sc2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from s2clientprotocol import query_pb2 as query_pb
from s2clientprotocol import raw_pb2 as raw_pb
from s2clientprotocol import sc2api_pb2 as sc_pb
from s2clientprotocol import common_pb2 as common_pb

from .action import combine_actions
from .data import ActionResult, ChatChannel, Race, Result, Status
Expand Down Expand Up @@ -41,12 +42,19 @@ def __init__(self, ws):

self._renderer = None
self.raw_affects_selection = False
self.enable_feature_layer = False

@property
def in_game(self):
return self._status in {Status.in_game, Status.in_replay}

async def join_game(self, name=None, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None):
feature_layer = None
if self.enable_feature_layer:
feature_layer = sc_pb.SpatialCameraSetup(
resolution=common_pb.Size2DI(x=1, y=1),
minimap_resolution=common_pb.Size2DI(x=1, y=1),
)
ifopts = sc_pb.InterfaceOptions(
raw=True,
score=True,
Expand All @@ -55,6 +63,7 @@ async def join_game(self, name=None, race=None, observed_player_id=None, portcon
raw_affects_selection=self.raw_affects_selection,
raw_crop_to_playable_area=False,
show_placeholders=True,
feature_layer=feature_layer,
)

if rgb_render_config:
Expand Down Expand Up @@ -197,7 +206,9 @@ async def actions(self, actions, return_successes=False):
return [ActionResult(r) for r in res.action.result if ActionResult(r) != ActionResult.Success]

async def query_pathing(
self, start: Union[Unit, Point2, Point3], end: Union[Point2, Point3]
self,
start: Union[Unit, Point2, Point3],
end: Union[Point2, Point3],
) -> Optional[Union[int, float]]:
"""Caution: returns "None" when path not found
Try to combine queries with the function below because the pathing query is generally slow.
Expand Down Expand Up @@ -261,7 +272,10 @@ async def _query_building_placement_fast(
return [p.result == 1 for p in result.query.placements]

async def query_building_placement(
self, ability: AbilityData, positions: List[Union[Point2, Point3]], ignore_resources: bool = True
self,
ability: AbilityData,
positions: List[Union[Point2, Point3]],
ignore_resources: bool = True
) -> List[ActionResult]:
"""This function might be deleted in favor of the function above (_query_building_placement_fast).

Expand Down Expand Up @@ -341,7 +355,8 @@ async def toggle_autocast(self, units: Union[List[Unit], Units], ability: Abilit
sc_pb.Action(
action_raw=raw_pb.ActionRaw(
toggle_autocast=raw_pb.ActionRawToggleAutocast(
ability_id=ability.value, unit_tags=(u.tag for u in units)
ability_id=ability.value,
unit_tags=(u.tag for u in units),
)
)
)
Expand Down Expand Up @@ -373,8 +388,7 @@ async def debug_create_unit(self, unit_spawn_commands: List[List[Union[UnitTypeI
pos=position.as_Point2D,
quantity=amount_of_units,
)
)
for unit_type, amount_of_units, position, owner_id in unit_spawn_commands
) for unit_type, amount_of_units, position, owner_id in unit_spawn_commands
)
)
)
Expand Down Expand Up @@ -593,13 +607,11 @@ async def _send_debug(self):
debug_pb.DebugCommand(
draw=debug_pb.DebugDraw(
text=[text.to_proto() for text in self._debug_texts] if self._debug_texts else None,
lines=[line.to_proto() for line in self._debug_lines]
if self._debug_lines
else None,
lines=[line.to_proto()
for line in self._debug_lines] if self._debug_lines else None,
boxes=[box.to_proto() for box in self._debug_boxes] if self._debug_boxes else None,
spheres=[sphere.to_proto() for sphere in self._debug_spheres]
if self._debug_spheres
else None,
spheres=[sphere.to_proto()
for sphere in self._debug_spheres] if self._debug_spheres else None,
)
)
]
Expand Down Expand Up @@ -647,11 +659,9 @@ async def debug_set_unit_value(self, unit_tags: Union[Iterable[int], Units, Unit
debug=sc_pb.RequestDebug(
debug=(
debug_pb.DebugCommand(
unit_value=debug_pb.DebugSetUnitValue(
unit_value=unit_value, value=float(value), unit_tag=unit_tag
)
)
for unit_tag in unit_tags
unit_value=debug_pb.
DebugSetUnitValue(unit_value=unit_value, value=float(value), unit_tag=unit_tag)
) for unit_tag in unit_tags
)
)
)
Expand Down
10 changes: 9 additions & 1 deletion sc2/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,11 @@ async def _host_game(

client = await _setup_host_game(server, map_settings, players, realtime, random_seed, disable_fog)
# Bot can decide if it wants to launch with 'raw_affects_selection=True'
if not isinstance(players[0], Human) and getattr(players[0].ai, "raw_affects_selection", None) is not None:
if not isinstance(players[0], Human) and getattr(players[0].ai, "enable_feature_layer", None) is not None:
client.raw_affects_selection = players[0].ai.raw_affects_selection
# And 'enable_feature_layer=True'
if not isinstance(players[0], Human) and getattr(players[0].ai, "enable_feature_layer", None) is not None:
client.enable_feature_layer = players[0].ai.enable_feature_layer

try:
result = await _play_game(
Expand Down Expand Up @@ -510,6 +513,8 @@ async def _host_game_aiter(
client = await _setup_host_game(server, map_settings, players, realtime)
if not isinstance(players[0], Human) and getattr(players[0].ai, "raw_affects_selection", None) is not None:
client.raw_affects_selection = players[0].ai.raw_affects_selection
if not isinstance(players[0], Human) and getattr(players[0].ai, "enable_feature_layer", None) is not None:
client.enable_feature_layer = players[0].ai.enable_feature_layer

try:
result = await _play_game(players[0], client, realtime, portconfig, step_time_limit, game_time_limit)
Expand Down Expand Up @@ -548,6 +553,9 @@ async def _join_game(
# Bot can decide if it wants to launch with 'raw_affects_selection=True'
if not isinstance(players[1], Human) and getattr(players[1].ai, "raw_affects_selection", None) is not None:
client.raw_affects_selection = players[1].ai.raw_affects_selection
# And 'enable_feature_layer=True'
if not isinstance(players[1], Human) and getattr(players[1].ai, "enable_feature_layer", None) is not None:
client.enable_feature_layer = players[1].ai.enable_feature_layer

try:
result = await _play_game(players[1], client, realtime, portconfig, step_time_limit, game_time_limit)
Expand Down