Skip to content

Commit fd78136

Browse files
committed
Lots of thread safety
1 parent 66add4c commit fd78136

File tree

14 files changed

+472
-169
lines changed

14 files changed

+472
-169
lines changed

ecsact/runtime/async.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ typedef enum {
7777
*/
7878
typedef void (*ecsact_async_error_callback)(
7979
//
80-
ecsact_async_error async_err,
81-
ecsact_async_request_id request_id,
82-
void* callback_user_data
80+
ecsact_async_error async_err,
81+
int request_ids_length,
82+
ecsact_async_request_id* request_ids,
83+
void* callback_user_data
8384
);
8485

8586
/**

reference/async_reference/async_reference.cc

Lines changed: 43 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,20 @@ ecsact_async_request_id async_reference::connect(const char* connection_string
1010
) {
1111
std::string connect_str(connection_string);
1212

13+
registry_id = ecsact_create_registry("async_reference_impl_reg");
14+
pending_registry_id =
15+
ecsact_create_registry("pending_async_reference_impl_reg");
16+
1317
auto req_id = next_request_id();
1418
// The good and bad strings simulate the outcome of connections
1519
if(connect_str == "good") {
1620
is_connected = true;
1721

18-
registry_id = ecsact_create_registry("async_reference_impl_reg");
19-
pending_registry_id =
20-
ecsact_create_registry("pending_async_reference_impl_reg");
21-
2222
execute_systems();
2323
} else {
2424
// Same thing that happens in enqueue? Callback next flush?
2525
is_connected = false;
26+
is_connected_notified = false;
2627
}
2728

2829
return req_id;
@@ -33,57 +34,43 @@ ecsact_async_request_id async_reference::enqueue_execution_options(
3334
) {
3435
auto req_id = next_request_id();
3536

36-
if(is_connected == false) {
37+
if(is_connected == false && is_connected_notified == false) {
3738
types::async_error async_err{
3839
.error = ECSACT_ASYNC_ERR_PERMISSION_DENIED,
39-
.request_id = req_id};
40-
41-
// ecsact_add_component(
42-
// *pending_registry_id,
43-
// entity_id,
44-
// component_id,
45-
46-
// )
40+
.request_ids = {req_id},
41+
};
4742

43+
is_connected_notified = true;
4844
// Could block here
4945
async_callbacks.add(async_err);
5046
return req_id;
5147
}
5248

53-
auto cpp_options = util::c_to_cpp_execution_options(options);
54-
auto error = util::validate_options(cpp_options);
49+
auto cpp_options =
50+
util::c_to_cpp_execution_options(options, *pending_registry_id);
5551

56-
auto async_err = types::async_error{
57-
.error = error,
52+
types::pending_execution_options pending_options{
5853
.request_id = req_id,
54+
.options = cpp_options,
5955
};
6056

61-
if(error != ECSACT_ASYNC_OK) {
62-
// Could block here
63-
async_callbacks.add(async_err);
64-
65-
disconnect();
66-
return req_id;
67-
}
68-
6957
// Could block here
70-
error = tick_manager.try_add_options(cpp_options);
71-
async_err.error = error;
72-
73-
if(error != ECSACT_ASYNC_OK) {
74-
// Could block here
75-
async_callbacks.add(async_err);
76-
77-
disconnect();
78-
return req_id;
79-
}
58+
tick_manager.add_pending_options(pending_options);
8059
return req_id;
8160
}
8261

8362
void async_reference::execute_systems() {
8463
execution_thread = std::thread([this] {
8564
while(is_connected == true) {
8665
// Could block here
66+
auto async_err = tick_manager.validate_pending_options();
67+
68+
if(async_err.error != ECSACT_ASYNC_OK) {
69+
async_callbacks.add(async_err);
70+
71+
disconnect();
72+
}
73+
8774
auto cpp_options = tick_manager.get_options_now();
8875

8976
ecsact_execution_events_collector collector;
@@ -99,22 +86,14 @@ void async_reference::execute_systems() {
9986

10087
if(cpp_options) {
10188
options = std::make_unique<ecsact_execution_options>(
102-
util::cpp_to_c_execution_options(*cpp_options, *registry_id)
89+
util::cpp_to_c_execution_options(
90+
*cpp_options,
91+
*registry_id,
92+
*pending_registry_id
93+
)
10394
);
10495
}
10596

106-
auto systems_error =
107-
ecsact_execute_systems(*registry_id, 1, options.get(), &collector);
108-
109-
if(systems_error != ECSACT_EXEC_SYS_OK) {
110-
async_callbacks.add(systems_error);
111-
disconnect();
112-
return;
113-
}
114-
115-
// Could block here
116-
tick_manager.increment_and_merge_tick();
117-
11897
std::vector<ecsact_async_request_id> pending_entities;
11998

12099
// Could block here
@@ -134,6 +113,15 @@ void async_reference::execute_systems() {
134113
// Could block here
135114
async_callbacks.add(created_entity);
136115
}
116+
117+
auto systems_error =
118+
ecsact_execute_systems(*registry_id, 1, options.get(), &collector);
119+
120+
if(systems_error != ECSACT_EXEC_SYS_OK) {
121+
async_callbacks.add(systems_error);
122+
disconnect();
123+
return;
124+
}
137125
}
138126
});
139127
}
@@ -142,33 +130,24 @@ void async_reference::flush_events(
142130
const ecsact_execution_events_collector* execution_events,
143131
const ecsact_async_events_collector* async_events
144132
) {
145-
if(async_events != nullptr) {
146-
// Could block here
147-
bool breaking_error = async_callbacks.invoke(*async_events);
148-
149-
if(breaking_error) {
150-
// Just disconnect immediately
151-
disconnect();
152-
return;
153-
}
154-
}
155-
156-
if(execution_events != nullptr) {
157-
// Could block here
158-
exec_callbacks.invoke(*execution_events, *registry_id);
133+
async_callbacks.invoke(async_events);
134+
if(registry_id) {
135+
exec_callbacks.invoke(execution_events, *registry_id);
159136
}
160137
}
161138

162139
ecsact_async_request_id async_reference::create_entity_request() {
163140
// NOTE: Add entity to both registries
164141
// Consider ensure entity
165142
auto req_id = next_request_id();
166-
if(is_connected == false) {
143+
if(is_connected == false && is_connected_notified == false) {
167144
types::async_error async_err{
168145
.error = ECSACT_ASYNC_ERR_PERMISSION_DENIED,
169-
.request_id = req_id};
146+
.request_ids = {req_id},
147+
};
170148

171149
async_callbacks.add(async_err);
150+
is_connected_notified = true;
172151

173152
return req_id;
174153
}

reference/async_reference/async_reference.hh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ private:
6262
std::thread execution_thread;
6363
std::mutex pending_m;
6464
std::atomic_bool is_connected = false;
65+
std::atomic_bool is_connected_notified = false;
6566

6667
ecsact_async_request_id next_request_id();
6768
ecsact_async_request_id convert_request_id(int32_t id);

reference/async_reference/callbacks/async_callbacks.cc

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ void async_callbacks::add(const types::async_requests type) {
55
requests.insert(requests.end(), type);
66
}
77

8-
bool async_callbacks::invoke(const ecsact_async_events_collector& async_events
8+
void async_callbacks::invoke(const ecsact_async_events_collector* async_events
99
) {
10-
bool breaking_error = false;
10+
if(async_events == nullptr) {
11+
std::unique_lock lk(async_m);
12+
requests.clear();
13+
return;
14+
}
1115

1216
std::vector<types::async_requests> pending_requests;
1317

@@ -18,33 +22,31 @@ bool async_callbacks::invoke(const ecsact_async_events_collector& async_events
1822

1923
for(auto& request : pending_requests) {
2024
std::visit(
21-
[request, &async_events, &breaking_error](auto&& error) {
25+
[request, &async_events](auto&& error) {
2226
using T = std::decay_t<decltype(error)>;
2327
if constexpr(std::is_same_v<T, types::async_error>) {
24-
async_events.async_error_callback(
28+
async_events->async_error_callback(
2529
error.error,
26-
static_cast<ecsact_async_request_id>(error.request_id),
27-
async_events.async_error_callback_user_data
30+
error.request_ids.size(),
31+
error.request_ids.data(),
32+
async_events->async_error_callback_user_data
2833
);
29-
breaking_error = true;
3034
} else if constexpr(std::is_same_v<T, ecsact_execute_systems_error>) {
31-
async_events.system_error_callback(
35+
async_events->system_error_callback(
3236
error,
33-
async_events.system_error_callback_user_data
37+
async_events->system_error_callback_user_data
3438
);
35-
breaking_error = true;
3639
} else if constexpr(std::is_same_v<T, types::entity>) {
37-
async_events.async_entity_callback(
40+
async_events->async_entity_callback(
3841
*error.entity_id,
3942
error.request_id,
40-
async_events.async_entity_error_callback_user_data
43+
async_events->async_entity_error_callback_user_data
4144
);
4245
}
4346
},
4447
request
4548
);
4649
}
47-
return breaking_error;
4850
}
4951

5052
void async_callbacks::add_many(const std::vector<types::async_requests>& types

reference/async_reference/callbacks/async_callbacks.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class async_callbacks {
1111
public:
12-
bool invoke(const ecsact_async_events_collector& async_events);
12+
void invoke(const ecsact_async_events_collector* async_events);
1313
void add(const types::async_requests type);
1414
void add_many(const std::vector<types::async_requests>& types);
1515

reference/async_reference/callbacks/execution_callbacks.cc

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,17 @@ std::vector<types::callback_info> execution_callbacks::remove_callbacks_info =
99
std::mutex execution_callbacks::execution_m = {};
1010

1111
void execution_callbacks::invoke(
12-
const ecsact_execution_events_collector& execution_events,
12+
const ecsact_execution_events_collector* execution_events,
1313
ecsact_registry_id registry_id
1414
) {
15+
if(execution_events == nullptr) {
16+
std::unique_lock lk(execution_m);
17+
init_callbacks_info.clear();
18+
update_callbacks_info.clear();
19+
remove_callbacks_info.clear();
20+
return;
21+
}
22+
1523
std::vector<types::callback_info> init_callbacks;
1624
std::vector<types::callback_info> update_callbacks;
1725
std::vector<types::callback_info> remove_callbacks;
@@ -35,12 +43,12 @@ void execution_callbacks::invoke(
3543
component_info.component_id
3644
);
3745

38-
execution_events.init_callback(
46+
execution_events->init_callback(
3947
component_info.event,
4048
component_info.entity_id,
4149
component_info.component_id,
4250
component_data,
43-
execution_events.init_callback_user_data
51+
execution_events->init_callback_user_data
4452
);
4553
}
4654

@@ -51,12 +59,12 @@ void execution_callbacks::invoke(
5159
component_info.component_id
5260
);
5361

54-
execution_events.update_callback(
62+
execution_events->update_callback(
5563
component_info.event,
5664
component_info.entity_id,
5765
component_info.component_id,
5866
component_data,
59-
execution_events.update_callback_user_data
67+
execution_events->update_callback_user_data
6068
);
6169
}
6270

@@ -67,12 +75,12 @@ void execution_callbacks::invoke(
6775
component_info.component_id
6876
);
6977

70-
execution_events.remove_callback(
78+
execution_events->remove_callback(
7179
component_info.event,
7280
component_info.entity_id,
7381
component_info.component_id,
7482
component_data,
75-
execution_events.remove_callback_user_data
83+
execution_events->remove_callback_user_data
7684
);
7785
}
7886
}

reference/async_reference/callbacks/execution_callbacks.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
class execution_callbacks {
1010
public:
1111
void invoke(
12-
const ecsact_execution_events_collector& execution_events,
12+
const ecsact_execution_events_collector* execution_events,
1313
ecsact_registry_id registry_id
1414
);
1515

0 commit comments

Comments
 (0)