Fill This Form To Receive Instant Help

Help in Homework
trustpilot ratings
google ratings


Homework answers / question archive / How to change the reward functions by different approach based on the following code? 1

How to change the reward functions by different approach based on the following code? 1

Computer Science

How to change the reward functions by different approach based on the following code?

1. Positive reinforcement: For each successful step, a reward is given.

2. Negative reinforcement: For each unsuccessful step, a penalty is imposed.

3. Reward shaping: A reward is given for each step that brings the agent closer to the goal state.

4. Punishment: Each step that moves the agent away from the goal state incurs a penalty.

5. Extrinsic rewards: A reward is given for each step that an external source specifies.


 

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

# Initialize the "Cart-Pole" environment
# unwrapped is used to open restrictions
env = gym.make('CartPole-v0').unwrapped

# set up matplotlib
# get_backend() Return the name of the current backend.
is_ipython = 'inline' in matplotlib.get_backend()   
if is_ipython:
   from IPython import display

# plt.ion() function converts the display mode of matplotlib to interactive mode. 
# The code continues to execute even if plt.show() is encountered in the script.
plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 

# 1. Replay Memory: Using experience replay memory to train the DQN


# Map (state, action) to their (next_state, reward) result, where state is the screen difference image 
Transition = namedtuple('Transition',
                       ('state', 'action', 'next_state', 'reward'))


# * ReplayMemory: A buffer of a bounded size that holds the most recently observed transitions.
# Also implements a .sample() method for selecting random batches of transformations for training.
class ReplayMemory(object):

   def __init__(self, capacity):
       self.capacity = capacity    # 10000
       self.memory = []
       self.position = 0

   def push(self, *args):
       """Saves a transition."""
       if len(self.memory) < self.capacity:
           self.memory.append(None)
       self.memory[self.position] = Transition(*args)  # * sign in front of the parameter: the number of parameters is more than one. / If the parameters passed in by the function with an asterisk (*) parameter are stored as a tuple (tuple), and two (*) signs indicate that dictionary (dict)
       self.position = (self.position + 1) % self.capacity

   def sample(self, batch_size):
       return random.sample(self.memory, batch_size)

   def __len__(self):
       return len(self.memory)


# 2. Q_network: DQN algorithm (off-policy)

class DQN(nn.Module):

   def __init__(self, h, w, outputs):
       super(DQN, self).__init__()
       self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)  # in_channels: the number of input channels,
                                                               # out_channels: the number of output channels,
                                                               # kernel_size: the size of the convolution kernel. The type is int or tuple. When the convolution is square, only an integer side length is required. The convolution is not a square. Enter a tuple to represent the height and width.
                                                               # stride: the step size of each slide of the convolution
       self.bn1 = nn.BatchNorm2d(16)   # After the convolutional layer of the convolutional neural network, BatchNorm2d is added to normalize the data.
                                       # num_features: general input parameter is batch_sizenum_featuresheight*width, which is the number of features in it
       self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
       self.bn2 = nn.BatchNorm2d(32)
       self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
       self.bn3 = nn.BatchNorm2d(32)

       # The number of linear input connections depends on the output of the conv2d layer and therefore on the size of the input image
       def conv2d_size_out(size, kernel_size=5, stride=2):
           return (size - (kernel_size - 1) - 1) // stride + 1
       convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
       convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
       linear_input_size = convw * convh * 32
       self.head = nn.Linear(linear_input_size, outputs)

   # Call with one element to determine the next operation, or call batch during optimization. return tensor([[left0exp,right0exp]...])
   def forward(self, x):
       x = F.relu(self.bn1(self.conv1(x)))
       x = F.relu(self.bn2(self.conv2(x)))
       x = F.relu(self.bn3(self.conv3(x)))
       return self.head(x.view(x.size(0), -1)) # x.view(x.size(0), -1). Flatten the previous multi-dimensional tensor into one dimension


# 3. Input Extraction

# Integrate multiple steps together with Compose
resize = T.Compose([T.ToPILImage(), # convert a tensor to PIL image
                   T.Resize(40, interpolation=Image.CUBIC),    # image transformation
                   T.ToTensor()])  # convert a PIL image to tensor (H*W*C) in range [0,255] to a torch.Tensor(C*H*W) in the range [0.0,1.0]

# Get the location of the cart
def get_cart_location(screen_width):
   world_width = env.x_threshold * 2   # Width, x_threshold = 2.4, indicating that the distance between the cart and the center position is no more than 2.4
   scale = screen_width / world_width
   return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART


def get_screen():
   # Return screen required by gym is 400x600x3, if it is 800x1200x3. Convert it to torch order (CHW).
   screen = env.render(mode='rgb_array').transpose((2, 0, 1))
   # The cart is in the bottom half, so it doesn't include the top and bottom of the screen
   _, screen_height, screen_width = screen.shape
   screen = screen[:, int(screen_height * 0.4):int(screen_height * 0.8)]
   view_width = int(screen_width * 0.6)
   cart_location = get_cart_location(screen_width)
   if cart_location < view_width // 2:
       slice_range = slice(view_width)
   elif cart_location > (screen_width - view_width // 2):
       slice_range = slice(-view_width, None)
   else:
       slice_range = slice(cart_location - view_width // 2,
                           cart_location + view_width // 2)
   # Remove the edges to get a square image centered on the cart
   screen = screen[:, :, slice_range]
   # Convert to float type, rescale, convert to torch tensor
   # (this doesn't require a copy)
   screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
   screen = torch.from_numpy(screen)   # torch.from_numpy(). Convert the array into a tensor, and the two share memory, and if the tensor is modified, such as reassignment, the original array will also change accordingly.
   # Resize and add batch dimension (BCHW)
   return resize(screen).unsqueeze(0).to(device)


env.reset()
plt.figure()   
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), interpolation='none')    
plt.title('Example extracted screen')
plt.show()
 

# 4. Training: Instantiates the model and its optimizer, and defines some utilities

BATCH_SIZE = 128    # size of minibatch
GAMMA = 0.999       # discount factor for target Q
EPS_START = 0.9     # starting value of epsilon
EPS_END = 0.05      # final value of epsilon
EPS_DECAY = 200
TARGET_UPDATE = 10

# Get the screen size so can properly initialize the layers based on the shape returned by the AI gym.
# Typical dimensions at this point are closer to 3x40x90
# Result of clipping and shrinking the renderbuffer in get_screen()
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape

# Get action count from gym action space
n_actions = env.action_space.n 

policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()   

optimizer = optim.RMSprop(policy_net.parameters())  
memory = ReplayMemory(10000)

steps_done = 0
 

def select_action(state):
   global steps_done
   sample = random.random()    
   eps_threshold = EPS_END + (EPS_START - EPS_END) *
                   math.exp(-1. * steps_done / EPS_DECAY)
   steps_done += 1
   if sample > eps_threshold:
       with torch.no_grad():
           # t.max(1) will return the maximum column value for each row.
           # The second column of the maximum result is the index at which the largest element was found, so choose the action with the larger expected reward.
           return policy_net(state).max(1)[1].view(1, 1)
   else:
       return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)   # ????

episode_durations = []
 

def plot_durations():
   plt.figure(2)
   plt.clf()
   durations_t = torch.tensor(episode_durations, dtype=torch.float)
   plt.title('Training...')
   plt.xlabel('Episode')
   plt.ylabel('Duration') 
   plt.plot(durations_t.numpy())
   # Take the average of 100 episodes and plot them
   if len(durations_t) >= 100:
       means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
       means = torch.cat((torch.zeros(99), means))
       plt.plot(means.numpy())
   plt.pause(0.001)  # Pause to update the graph
   if is_ipython:
       display.clear_output(wait=True)
       display.display(plt.gcf())

 

# 5. Training Loop: optimize_model function that performs a single step of the optimization

def optimize_model():

   if len(memory) < BATCH_SIZE:
       return

   transitions = memory.sample(BATCH_SIZE)
   # transpose batch
   # This will convert the batch array of transitions into a transition of batch arrays.
   batch = Transition(*zip(*transitions))

   # Calculate the mask of the non-final state and concatenate the batch elements (the final state will be the state after the simulation ends)
   non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.uint8)
   non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])   # torch.cat???????tensor??????
   state_batch = torch.cat(batch.state)
   action_batch = torch.cat(batch.action)
   reward_batch = torch.cat(batch.reward)

   # Compute Q(s_t, a) - model computes Q(s_t) and then we select the column of actions to take.
   # These are the actions to take for each batch state according to policy_net
   state_action_values = policy_net(state_batch).gather(1, action_batch)

   # Calculate V(s_{t+1}) for all next states
   # non_final_next_states. The expected value of the operation is calculated based on the "older" target_net;
   # Use max(1)[0] to choose the best reward. This is based on mask merging so that we get the expected state value, or 0 in case the state is final.
   next_state_values = torch.zeros(BATCH_SIZE, device=device)
   next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
   # Calculate the expected Q value
   expected_state_action_values = (next_state_values * GAMMA) + reward_batch
  
   # Calculate the Huber loss
   loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1)) 
  
   # Optimization model
   optimizer.zero_grad()   
   loss.backward()
   for param in policy_net.parameters():
       param.grad.data.clamp_(-1, 1)   

   optimizer.step()


# Main process of training
num_episodes = 300
for i_episode in range(num_episodes):
   # Initialize the environment and state
   env.reset()
   last_screen = get_screen()
   current_screen = get_screen()
   state = current_screen - last_screen
   for t in count():
       # Choose an action and execute it
       action = select_action(state)
       _, reward, done, _, _ = env.step(action.item())    
       reward = torch.tensor([reward], device=device)
    
       # Observe the new state
       last_screen = current_screen
       current_screen = get_screen()
       if not done:
           next_state = current_screen - last_screen
       else:
           next_state = None
    
       # Store transitions in memory
       memory.push(state, action, next_state, reward)
    
       # Move to the next state
       state = next_state
    
       # Perform a step of the optimization (on the target network)
       optimize_model()
       if done:
           episode_durations.append(t + 1)
           plot_durations()
           break
   
   # Update the target network, copying all weights and biases in the DQN
   if i_episode % TARGET_UPDATE == 0:  
       target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()    
env.close()
plt.ioff()  
plt.show()

Purchase A New Answer

Custom new solution created by our subject matter experts

GET A QUOTE

Related Questions