1
+ import gym
2
+ import numpy as np
3
+ import pandas as pd
4
+ import json
5
+ from pathlib import Path
6
+
7
+ gym .logger .set_level (40 )
8
+
9
+ class VectoredGymEnvironment ():
10
+ """
11
+ Envrioment class to run multiple similations and collect rollout data
12
+ """
13
+ def __init__ (self , registered_gym_env , num_of_envs = 1 ):
14
+ self .envs_initialized = False
15
+ self .initialized_envs = {}
16
+ self .env_states = {}
17
+ self .env_reset_counter = {}
18
+ self .num_of_envs = num_of_envs
19
+ self .data_rows = []
20
+
21
+ self .initialize_envs (num_of_envs , registered_gym_env )
22
+
23
+ def is_initialized (self ):
24
+ return self .envs_initialized
25
+
26
+ def initialize_envs (
27
+ self ,
28
+ num_of_envs ,
29
+ registered_gym_env ):
30
+ """Initialize multiple Openai gym environments.
31
+ Each envrionment will start with a different random seed.
32
+
33
+ Arguments:
34
+ num_of_envs {int} -- Number of environments/simulations to initiate
35
+ registered_gym_env {str} -- Environment name of the registered gym environment
36
+ """
37
+ print ("Initializing {} environments of {}" .format (num_of_envs , registered_gym_env ))
38
+ for i in range (0 , num_of_envs ):
39
+ environment_id = "environment_" + str (i )
40
+ environment = gym .make (registered_gym_env )
41
+ environment = environment .unwrapped
42
+ environment .seed (i )
43
+ self .env_states [environment_id ] = environment .reset ()
44
+ self .env_reset_counter [environment_id ] = 0
45
+ self .initialized_envs [environment_id ] = environment
46
+ self .envs_initialized = True
47
+ self .state_dims = len (self .env_states [environment_id ])
48
+
49
+ def get_environment_states (self ):
50
+ return self .env_states
51
+
52
+ def dump_environment_states (self , dir_path , file_name ):
53
+ """Dumping current states of all the envrionments into file
54
+
55
+ Arguments:
56
+ dir_path {str} -- Directory path of the target file
57
+ file_name {str} -- File name of the target file
58
+ """
59
+ data_folder = Path (dir_path )
60
+ file_path = data_folder / file_name
61
+
62
+ with open (file_path , 'w' ) as outfile :
63
+ for state in self .env_states .values ():
64
+ json .dump (list (state ), outfile )
65
+ outfile .write ('\n ' )
66
+
67
+ def get_environment_ids (self ):
68
+ return list (self .initialized_envs .keys ())
69
+
70
+ def step (self , environment_id , action ):
71
+ local_env = self .initialized_envs [environment_id ]
72
+ observation , reward , done , info = local_env .step (action )
73
+
74
+ self .env_states [environment_id ] = observation
75
+ return observation , reward , done , info
76
+
77
+ def reset (self , environment_id ):
78
+ self .env_states [environment_id ] = \
79
+ self .initialized_envs [environment_id ].reset ()
80
+ return self .env_states [environment_id ]
81
+
82
+ def reset_all_envs (self ):
83
+ print ("Resetting all the environments..." )
84
+ for i in range (0 , self .num_of_envs ):
85
+ environment_id = "environment_" + str (i )
86
+ self .reset (environment_id )
87
+
88
+ def close (self , environment_id ):
89
+ self .initialized_envs [environment_id ].close ()
90
+ return
91
+
92
+ def render (self , environment_id ):
93
+ self .initialized_envs [environment_id ].render ()
94
+ return
95
+
96
+ def collect_rollouts_for_single_env_with_given_episodes (self , environment_id , action_prob , num_episodes ):
97
+ """Collect rollouts with given steps from one environment
98
+
99
+ Arguments:
100
+ environment_id {str} -- Environment id for the environment
101
+ action_prob {list} -- Action probabilities of the simulated policy
102
+ num_episodes {int} -- Number of episodes to run rollouts
103
+ """
104
+ # normalization if sum of probs is not exact equal to 1
105
+ action_prob = np .array (action_prob )
106
+ if action_prob .sum () != 1 :
107
+ action_prob /= action_prob .sum ()
108
+ action_prob = list (action_prob )
109
+
110
+ for _ in range (num_episodes ):
111
+ done = False
112
+ cumulative_rewards = 0
113
+ while not done :
114
+ data_item = []
115
+ action = np .random .choice (len (action_prob ), p = action_prob )
116
+ cur_state_features = self .env_states [environment_id ]
117
+ _ , reward , done , _ = self .step (environment_id , action )
118
+ cumulative_rewards += reward
119
+ episode_id = int (environment_id .split ('_' )[- 1 ]) + \
120
+ self .num_of_envs * self .env_reset_counter [environment_id ]
121
+ if not done :
122
+ data_item .extend ([action , action_prob , episode_id , reward , 0.0 ])
123
+ else :
124
+ data_item .extend ([action , action_prob , episode_id , reward , cumulative_rewards ])
125
+ for j in range (len (cur_state_features )):
126
+ data_item .append (cur_state_features [j ])
127
+ self .data_rows .append (data_item )
128
+
129
+ self .reset (environment_id )
130
+ self .env_reset_counter [environment_id ] += 1
131
+
132
+ def collect_rollouts_for_single_env_with_given_steps (self , environment_id , action_prob , num_steps ):
133
+ """Collect rollouts with given steps from one environment
134
+
135
+ Arguments:
136
+ environment_id {str} -- Environment id for the environment
137
+ action_prob {list} -- Action probabilities of the simulated policy
138
+ num_episodes {int} -- Number of steps to run rollouts
139
+ """
140
+ # normalization if sum of probs is not exact equal to 1
141
+ action_prob = np .array (action_prob )
142
+ if action_prob .sum () != 1 :
143
+ action_prob /= action_prob .sum ()
144
+ action_prob = list (action_prob )
145
+
146
+ for _ in range (num_steps ):
147
+ data_item = []
148
+ action = np .random .choice (len (action_prob ), p = action_prob )
149
+ cur_state_features = self .env_states [environment_id ]
150
+ _ , reward , done , _ = self .step (environment_id , action )
151
+ episode_id = int (environment_id .split ('_' )[- 1 ]) + \
152
+ self .num_of_envs * self .env_reset_counter [environment_id ]
153
+ data_item .extend ([action , action_prob , episode_id , reward ])
154
+ for j in range (len (cur_state_features )):
155
+ data_item .append (cur_state_features [j ])
156
+ self .data_rows .append (data_item )
157
+ if done :
158
+ self .reset (environment_id )
159
+ self .env_reset_counter [environment_id ] += 1
160
+
161
+ def collect_rollouts_with_given_action_probs (self , num_steps = None , num_episodes = None , action_probs = None , file_name = None ):
162
+ """Collect rollouts from all the initiated environments with given action probs
163
+
164
+ Keyword Arguments:
165
+ num_steps {int} -- Number of steps to run rollouts (default: {None})
166
+ num_episodes {int} -- Number of episodes to run rollouts (default: {None})
167
+ action_probs {list} -- Action probs for the policy (default: {None})
168
+ file_name {str} -- Batch transform output that contain predictions of probs (default: {None})
169
+
170
+ Returns:
171
+ [Dataframe] -- Dataframe that contains the rollout data from all envs
172
+ """
173
+ if file_name is not None :
174
+ assert action_probs is None
175
+ json_lines = [json .loads (line .rstrip ('\n ' )) for line in open (file_name ) if line is not '' ]
176
+ action_probs = []
177
+ for line in json_lines :
178
+ if line .get ('SageMakerOutput' ) is not None :
179
+ action_probs .append (line ['SageMakerOutput' ].get ("predictions" )[0 ])
180
+ else :
181
+ action_probs .append (line .get ("predictions" )[0 ])
182
+
183
+ assert len (action_probs ) == self .num_of_envs
184
+ for index , environment_id in enumerate (self .get_environment_ids ()):
185
+ if num_steps is not None :
186
+ assert num_episodes is None
187
+ self .collect_rollouts_for_single_env_with_given_steps (
188
+ environment_id , action_probs [index ], num_steps
189
+ )
190
+ else :
191
+ assert num_episodes is not None
192
+ self .collect_rollouts_for_single_env_with_given_episodes (
193
+ environment_id , action_probs [index ], num_episodes
194
+ )
195
+
196
+ col_names = self ._create_col_names ()
197
+ df = pd .DataFrame (self .data_rows , columns = col_names )
198
+
199
+ return df
200
+
201
+ def _create_col_names (self ):
202
+ """Create column names of dataframe that can be consumed by Coach
203
+
204
+ Returns:
205
+ [list] -- List of column names
206
+ """
207
+ col_names = ['action' , 'all_action_probabilities' , 'episode_id' , 'reward' , 'cumulative_rewards' ]
208
+ for i in range (self .state_dims ):
209
+ col_names .append ('state_feature_' + str (i ))
210
+
211
+ return col_names
0 commit comments