3
3
#include < ranges>
4
4
#include < algorithm>
5
5
#include < iterator>
6
+ #include < chrono>
6
7
7
8
#include " async_reference.hh"
8
9
10
+ struct parsed_connection_string {
11
+ std::string host;
12
+ std::map<std::string, std::string> options;
13
+ };
14
+
15
+ static auto str_subrange (std::string_view str, auto start, auto end) {
16
+ return str.substr (start, end - start);
17
+ }
18
+
19
+ static auto parse_connection_string (std::string_view str)
20
+ -> parsed_connection_string {
21
+ auto result = parsed_connection_string{};
22
+
23
+ auto options_start = str.find (" ?" );
24
+
25
+ if (options_start == std::string::npos) {
26
+ // No options
27
+ result.host = str;
28
+ return result;
29
+ }
30
+
31
+ result.host = str.substr (0 , options_start);
32
+
33
+ auto amp_idx = options_start;
34
+
35
+ while (amp_idx != std::string::npos) {
36
+ auto next_amp_idx = str.find (" &" , amp_idx + 1 );
37
+
38
+ auto name_str = std::string{};
39
+ auto value_str = std::string{};
40
+ auto eq_idx = str.find (" =" , amp_idx);
41
+
42
+ if (eq_idx >= next_amp_idx) {
43
+ name_str = str_subrange (str, amp_idx + 1 , next_amp_idx);
44
+ } else {
45
+ name_str = str_subrange (str, amp_idx + 1 , eq_idx);
46
+ value_str = str_subrange (str, eq_idx + 1 , next_amp_idx);
47
+ }
48
+
49
+ result.options [name_str] = value_str;
50
+
51
+ amp_idx = next_amp_idx;
52
+ }
53
+
54
+ return result;
55
+ }
56
+
9
57
ecsact_async_request_id async_reference::connect (const char * connection_string
10
58
) {
59
+ auto req_id = next_request_id ();
60
+
11
61
std::string connect_str (connection_string);
12
62
13
- registry_id = ecsact_create_registry ( " async_reference_impl_reg " );
63
+ auto result = parse_connection_string (connect_str );
14
64
15
- auto req_id = next_request_id ();
16
- // The good and bad strings simulate the outcome of connections
17
- if (connect_str == " good" ) {
18
- is_connected = true ;
65
+ if (result.options .contains (" tick_rate" )) {
66
+ auto tick_str = result.options .at (" tick_rate" );
19
67
20
- execute_systems ();
21
- } else {
22
- // Same thing that happens in enqueue? Callback next flush?
68
+ int tick_int = std::stoi (tick_str);
69
+
70
+ tick_rate = std::chrono::milliseconds (tick_int);
71
+ }
72
+
73
+ // The good and bad strings simulate the outcome of connections
74
+ if (result.host != " good" ) {
23
75
is_connected = false ;
24
76
is_connected_notified = false ;
77
+ return req_id;
25
78
}
26
79
80
+ if (tick_rate.count () == 0 ) {
81
+ types::async_error async_err{
82
+ .error = ECSACT_ASYNC_INVALID_CONNECTION_STRING,
83
+ .request_ids = {req_id},
84
+ };
85
+
86
+ async_callbacks.add (async_err);
87
+ return req_id;
88
+ }
89
+
90
+ registry_id = ecsact_create_registry (" async_reference_impl_reg" );
91
+ is_connected = true ;
92
+ execute_systems ();
93
+
27
94
return req_id;
28
95
}
29
96
@@ -39,7 +106,6 @@ ecsact_async_request_id async_reference::enqueue_execution_options(
39
106
};
40
107
41
108
is_connected_notified = true ;
42
- // Could block here
43
109
async_callbacks.add (async_err);
44
110
return req_id;
45
111
}
@@ -51,24 +117,46 @@ ecsact_async_request_id async_reference::enqueue_execution_options(
51
117
.options = cpp_options,
52
118
};
53
119
54
- // Could block here
55
120
tick_manager.add_pending_options (pending_options);
56
121
return req_id;
57
122
}
58
123
59
124
void async_reference::execute_systems () {
60
125
execution_thread = std::thread ([this ] {
126
+ using namespace std ::chrono_literals;
127
+ using clock = std::chrono::high_resolution_clock;
128
+ using milliseconds = std::chrono::milliseconds;
129
+ using nanoseconds = std::chrono::nanoseconds;
130
+ using std::chrono::duration_cast;
131
+
132
+ nanoseconds execution_duration = {};
133
+ nanoseconds sleep_drift = {};
134
+
61
135
while (is_connected == true ) {
62
- // Could block here
63
136
auto async_err = tick_manager.validate_pending_options ();
64
137
138
+ const auto sleep_duration = tick_rate - execution_duration;
139
+
140
+ auto wait_start = clock::now ();
141
+ std::this_thread::sleep_for (sleep_duration - sleep_drift);
142
+ auto wait_end = clock::now ();
143
+
144
+ sleep_drift =
145
+ sleep_duration - duration_cast<nanoseconds>(wait_start - wait_end);
146
+
65
147
if (async_err.error != ECSACT_ASYNC_OK) {
66
148
async_callbacks.add (async_err);
67
149
68
- disconnect ();
150
+ is_connected = false ;
151
+ break ;
69
152
}
70
153
71
- auto cpp_options = tick_manager.get_options_now ();
154
+ auto start = clock::now ();
155
+
156
+ auto cpp_options = tick_manager.move_and_increment_tick ();
157
+
158
+ // TODO(Kelwan): Add done callbacks so we can resolve all requests
159
+ // https://github.com/ecsact-dev/ecsact_runtime/issues/102
72
160
73
161
ecsact_execution_events_collector collector;
74
162
collector.init_callback = &execution_callbacks::init_callback;
@@ -87,31 +175,17 @@ void async_reference::execute_systems() {
87
175
);
88
176
}
89
177
90
- std::vector<ecsact_async_request_id> pending_entities;
91
-
92
- // Could block here
93
- std::unique_lock lk (pending_m);
94
- pending_entities = std::move (pending_entity_requests);
95
- pending_entity_requests.clear ();
96
- lk.unlock ();
97
-
98
- for (auto & entity_request_id : pending_entities) {
99
- auto entity = ecsact_create_entity (*registry_id);
100
-
101
- types::entity created_entity{
102
- .entity_id = entity,
103
- .request_id = entity_request_id,
104
- };
105
- // Could block here
106
- async_callbacks.add (created_entity);
107
- }
178
+ process_entities ();
108
179
109
180
auto systems_error =
110
181
ecsact_execute_systems (*registry_id, 1 , options.get (), &collector);
111
182
183
+ auto end = clock::now ();
184
+ execution_duration = duration_cast<nanoseconds>(end - start);
185
+
112
186
if (systems_error != ECSACT_EXEC_SYS_OK) {
113
187
async_callbacks.add (systems_error);
114
- disconnect () ;
188
+ is_connected = false ;
115
189
return ;
116
190
}
117
191
}
@@ -129,8 +203,6 @@ void async_reference::flush_events(
129
203
}
130
204
131
205
ecsact_async_request_id async_reference::create_entity_request () {
132
- // NOTE: Add entity to both registries
133
- // Consider ensure entity
134
206
auto req_id = next_request_id ();
135
207
if (is_connected == false && is_connected_notified == false ) {
136
208
types::async_error async_err{
@@ -165,3 +237,23 @@ ecsact_async_request_id async_reference::next_request_id() {
165
237
ecsact_async_request_id async_reference::convert_request_id (int32_t id) {
166
238
return static_cast <ecsact_async_request_id>(id);
167
239
}
240
+
241
+ void async_reference::process_entities () {
242
+ std::vector<ecsact_async_request_id> pending_entities;
243
+
244
+ std::unique_lock lk (pending_m);
245
+ pending_entities = std::move (pending_entity_requests);
246
+ pending_entity_requests.clear ();
247
+ lk.unlock ();
248
+
249
+ for (auto & entity_request_id : pending_entities) {
250
+ auto entity = ecsact_create_entity (*registry_id);
251
+
252
+ types::entity created_entity{
253
+ .entity_id = entity,
254
+ .request_id = entity_request_id,
255
+ };
256
+
257
+ async_callbacks.add (created_entity);
258
+ }
259
+ }
0 commit comments