street-fighter-ai/main/general/street_fighter_custom_wrapper_general.py
2024-05-05 18:37:48 +02:00

160 lines
6.3 KiB
Python

# Copyright 2024 WANG Jing. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import math
import time
import collections
import gym
import numpy as np
# Custom environment wrapper
class StreetFighterCustomWrapper(gym.Wrapper):
def __init__(self, env, reset_round=False, rendering=False):
super(StreetFighterCustomWrapper, self).__init__(env)
self.env = env
self.win = 0
self.loss = 0
self.round = 0
self.round_end = False
self.jump = False
# Use a deque to store the last 9 frames
self.num_frames = 9
self.frame_stack = collections.deque(maxlen=self.num_frames)
self.num_step_frames = 6
self.reward_coeff = 3.0
self.total_timesteps = 0
self.match1_reward = 0
self.full_hp = 176
self.prev_player_health = self.full_hp
self.prev_oppont_health = self.full_hp
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(100, 128, 3), dtype=np.uint8)
self.reset_round = reset_round
self.rendering = rendering
def _stack_observation(self):
return np.stack([self.frame_stack[i * 3 + 2][:, :, i] for i in range(3)], axis=-1)
def reset(self):
observation = self.env.reset()
self.win = 0
self.loss = 0
self.prev_player_health = self.full_hp
self.prev_oppont_health = self.full_hp
self.total_timesteps = 0
# Clear the frame stack and add the first observation [num_frames] times
self.frame_stack.clear()
for _ in range(self.num_frames):
self.frame_stack.append(observation[::2, ::2, :])
return np.stack([self.frame_stack[i * 3 + 2][:, :, i] for i in range(3)], axis=-1)
def step(self, action):
custom_done = False
obs, _reward, _done, info = self.env.step(action)
if not self.jump:
self.frame_stack.append(obs[::2, ::2, :])
# Render the game if rendering flag is set to True.
if self.rendering:
self.env.render()
#time.sleep(0.003)
for _ in range(self.num_step_frames - 1):
# Keep the button pressed for (num_step_frames - 1) frames.
obs, _reward, _done, info= self.env.step(action)
if not self.jump:
self.frame_stack.append(obs[::2, ::2, :])
if self.rendering:
self.env.render()
#time.sleep(0.003)
curr_player_health = info['agent_hp']
curr_oppont_health = info['enemy_hp']
if not custom_done:
if self.round_end:
while not(curr_player_health == self.full_hp and curr_oppont_health == self.full_hp):
self.jump = True
obs, _reward, _done, info = self.env.step([0] * 12)
curr_player_health = info['agent_hp']
curr_oppont_health = info['enemy_hp']
if self.rendering:
self.env.render()
#time.sleep(0.01)
self.round_end = False
self.jump = False
if self.jump :
custom_reward = np.nan
self.prev_player_health = self.full_hp
self.prev_oppont_health = self.full_hp
custom_done = False
else :
self.total_timesteps += self.num_step_frames
reduce_enermy_health = self.prev_oppont_health - curr_oppont_health
reduce_player_health = self.prev_player_health - curr_player_health
# Determine game status and calculate rewards.
if reduce_player_health > 0 and curr_player_health < 0:
custom_reward = -math.pow(self.full_hp, (curr_oppont_health + 1) / (self.full_hp + 1))
self.loss += 1
self.round +=1
self.round_end = True
#print('loss')
if self.loss == 2:
self.loss = 0
self.round = 0
custom_done = True
elif reduce_enermy_health > 0 and curr_oppont_health <0 :
custom_reward = math.pow(self.full_hp, (curr_player_health + 1) / (self.full_hp + 1)) * self.reward_coeff
self.win += 1
self.round +=1
self.round_end = True
#print('win')
if self.win == 2:
self.win = 0
self.round = 0
custom_done = True
else:
if reduce_enermy_health >= 0 and reduce_player_health >= 0:
if reduce_enermy_health > 0 or reduce_player_health > 0:
custom_reward = self.reward_coeff * (reduce_enermy_health - reduce_player_health)
else:
custom_reward = 0
else:
custom_reward = 0
# Update health states.
self.prev_player_health = curr_player_health
self.prev_oppont_health = curr_oppont_health
# Max reward is 2 * 6 * full_hp = 2108 2 * (damage * 3 + winning_reward * 3) norm_coefficient = 0.001 for win first two
# Or in three round 2 * 6 * full_hp - full_hp - 1 + 3 * full_hp = 2464 (damage * 3 + winning_reward * 3)
#print("reward{}, reduce_player_health:{}, reduce_enermy_health:{}, agentHP:{}, enemyHP:{}".format(custom_reward,reduce_player_health,reduce_enermy_health,curr_player_health,curr_oppont_health))
return self._stack_observation(), 0.001 * custom_reward, custom_done, info # reward normalization