2023-04-12 16:16:07 +00:00
|
|
|
# Copyright 2023 LIN Yi. All Rights Reserved.
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
# ==============================================================================
|
|
|
|
|
2023-04-03 12:50:10 +00:00
|
|
|
import math
|
2023-04-05 13:39:08 +00:00
|
|
|
import time
|
2023-04-02 16:19:56 +00:00
|
|
|
import collections
|
|
|
|
|
|
|
|
import gym
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
# Custom environment wrapper
|
|
|
|
class StreetFighterCustomWrapper(gym.Wrapper):
|
2023-04-05 13:39:08 +00:00
|
|
|
def __init__(self, env, reset_round=True, rendering=False):
|
2023-04-02 16:19:56 +00:00
|
|
|
super(StreetFighterCustomWrapper, self).__init__(env)
|
|
|
|
self.env = env
|
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
# Use a deque to store the last 9 frames
|
2023-04-05 02:48:49 +00:00
|
|
|
self.num_frames = 9
|
2023-04-02 16:19:56 +00:00
|
|
|
self.frame_stack = collections.deque(maxlen=self.num_frames)
|
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
self.num_step_frames = 6
|
|
|
|
|
|
|
|
self.reward_coeff = 3.0
|
2023-04-03 12:50:10 +00:00
|
|
|
|
|
|
|
self.total_timesteps = 0
|
2023-04-02 16:19:56 +00:00
|
|
|
|
|
|
|
self.full_hp = 176
|
|
|
|
self.prev_player_health = self.full_hp
|
|
|
|
self.prev_oppont_health = self.full_hp
|
|
|
|
|
|
|
|
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(100, 128, 3), dtype=np.uint8)
|
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
self.reset_round = reset_round
|
|
|
|
self.rendering = rendering
|
2023-04-02 16:19:56 +00:00
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
def _stack_observation(self):
|
|
|
|
return np.stack([self.frame_stack[i * 3 + 2][:, :, i] for i in range(3)], axis=-1)
|
2023-04-02 16:19:56 +00:00
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
observation = self.env.reset()
|
2023-04-05 02:48:49 +00:00
|
|
|
|
2023-04-02 16:19:56 +00:00
|
|
|
self.prev_player_health = self.full_hp
|
|
|
|
self.prev_oppont_health = self.full_hp
|
2023-04-03 12:50:10 +00:00
|
|
|
|
|
|
|
self.total_timesteps = 0
|
2023-04-02 16:19:56 +00:00
|
|
|
|
|
|
|
# Clear the frame stack and add the first observation [num_frames] times
|
|
|
|
self.frame_stack.clear()
|
|
|
|
for _ in range(self.num_frames):
|
|
|
|
self.frame_stack.append(observation[::2, ::2, :])
|
|
|
|
|
2023-04-05 02:48:49 +00:00
|
|
|
return np.stack([self.frame_stack[i * 3 + 2][:, :, i] for i in range(3)], axis=-1)
|
2023-04-02 16:19:56 +00:00
|
|
|
|
|
|
|
def step(self, action):
|
2023-04-05 13:39:08 +00:00
|
|
|
custom_done = False
|
|
|
|
|
2023-04-02 16:19:56 +00:00
|
|
|
obs, _reward, _done, info = self.env.step(action)
|
2023-04-05 13:39:08 +00:00
|
|
|
self.frame_stack.append(obs[::2, ::2, :])
|
|
|
|
|
|
|
|
# Render the game if rendering flag is set to True.
|
|
|
|
if self.rendering:
|
|
|
|
self.env.render()
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
|
|
|
for _ in range(self.num_step_frames - 1):
|
|
|
|
|
|
|
|
# Keep the button pressed for (num_step_frames - 1) frames.
|
|
|
|
obs, _reward, _done, info = self.env.step(action)
|
|
|
|
self.frame_stack.append(obs[::2, ::2, :])
|
|
|
|
if self.rendering:
|
|
|
|
self.env.render()
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
2023-04-05 02:48:49 +00:00
|
|
|
curr_player_health = info['agent_hp']
|
|
|
|
curr_oppont_health = info['enemy_hp']
|
2023-04-03 12:50:10 +00:00
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
self.total_timesteps += self.num_step_frames
|
|
|
|
|
2023-04-02 16:19:56 +00:00
|
|
|
# Game is over and player loses.
|
|
|
|
if curr_player_health < 0:
|
2023-04-03 12:50:10 +00:00
|
|
|
custom_reward = -math.pow(self.full_hp, (curr_oppont_health + 1) / (self.full_hp + 1)) # Use the remaining health points of opponent as penalty.
|
2023-04-02 16:19:56 +00:00
|
|
|
# If the opponent also has negative health points, it's a even game and the reward is +1.
|
|
|
|
custom_done = True
|
|
|
|
|
|
|
|
# Game is over and player wins.
|
|
|
|
elif curr_oppont_health < 0:
|
2023-04-03 12:50:10 +00:00
|
|
|
# custom_reward = curr_player_health * self.reward_coeff # Use the remaining health points of player as reward.
|
2023-04-02 16:19:56 +00:00
|
|
|
# Multiply by reward_coeff to make the reward larger than the penalty to avoid cowardice of agent.
|
2023-04-03 12:50:10 +00:00
|
|
|
|
2023-04-05 02:48:49 +00:00
|
|
|
# custom_reward = math.pow(self.full_hp, (5940 - self.total_timesteps) / 5940) * self.reward_coeff # Use the remaining time steps as reward.
|
|
|
|
custom_reward = math.pow(self.full_hp, (curr_player_health + 1) / (self.full_hp + 1)) * self.reward_coeff
|
2023-04-02 16:19:56 +00:00
|
|
|
custom_done = True
|
|
|
|
|
2023-04-03 12:50:10 +00:00
|
|
|
# While the fighting is still going on
|
2023-04-02 16:19:56 +00:00
|
|
|
else:
|
|
|
|
custom_reward = self.reward_coeff * (self.prev_oppont_health - curr_oppont_health) - (self.prev_player_health - curr_player_health)
|
|
|
|
self.prev_player_health = curr_player_health
|
|
|
|
self.prev_oppont_health = curr_oppont_health
|
|
|
|
custom_done = False
|
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
# When reset_round flag is set to False (never reset), the session should always keep going.
|
|
|
|
if not self.reset_round:
|
2023-04-02 16:19:56 +00:00
|
|
|
custom_done = False
|
|
|
|
|
2023-04-05 13:39:08 +00:00
|
|
|
# Max reward is 6 * full_hp = 1054 (damage * 3 + winning_reward * 3) norm_coefficient = 0.001
|
|
|
|
return self._stack_observation(), 0.001 * custom_reward, custom_done, info # reward normalization
|
2023-04-02 16:19:56 +00:00
|
|
|
|