20
20
21
21
#include " rpc_util.h"
22
22
#include " src/__support/CPP/optional.h"
23
- #include " src/__support/GPU/utils.h"
24
23
#include " src/__support/macros/config.h"
25
24
26
25
#include < stdint.h>
@@ -38,6 +37,9 @@ namespace rpc {
38
37
#define __scoped_atomic_fetch_and (src, val, ord, scp ) \
39
38
__atomic_fetch_and (src, val, ord)
40
39
#endif
40
+ #if !__has_builtin(__scoped_atomic_thread_fence)
41
+ #define __scoped_atomic_thread_fence (ord, scp ) __atomic_thread_fence(ord)
42
+ #endif
41
43
42
44
// / A fixed size channel used to communicate between the RPC client and server.
43
45
struct Buffer {
@@ -110,14 +112,14 @@ template <bool Invert> struct Process {
110
112
111
113
// / Retrieve the inbox state from memory shared between processes.
112
114
LIBC_INLINE uint32_t load_inbox (uint64_t lane_mask, uint32_t index) const {
113
- return gpu ::broadcast_value (
115
+ return rpc ::broadcast_value (
114
116
lane_mask, __scoped_atomic_load_n (&inbox[index], __ATOMIC_RELAXED,
115
117
__MEMORY_SCOPE_SYSTEM));
116
118
}
117
119
118
120
// / Retrieve the outbox state from memory shared between processes.
119
121
LIBC_INLINE uint32_t load_outbox (uint64_t lane_mask, uint32_t index) const {
120
- return gpu ::broadcast_value (
122
+ return rpc ::broadcast_value (
121
123
lane_mask, __scoped_atomic_load_n (&outbox[index], __ATOMIC_RELAXED,
122
124
__MEMORY_SCOPE_SYSTEM));
123
125
}
@@ -128,7 +130,7 @@ template <bool Invert> struct Process {
128
130
// / cheaper than calling load_outbox to get the value to store.
129
131
LIBC_INLINE uint32_t invert_outbox (uint32_t index, uint32_t current_outbox) {
130
132
uint32_t inverted_outbox = !current_outbox;
131
- __atomic_thread_fence (__ATOMIC_RELEASE);
133
+ __scoped_atomic_thread_fence (__ATOMIC_RELEASE, __MEMORY_SCOPE_SYSTEM );
132
134
__scoped_atomic_store_n (&outbox[index], inverted_outbox, __ATOMIC_RELAXED,
133
135
__MEMORY_SCOPE_SYSTEM);
134
136
return inverted_outbox;
@@ -142,7 +144,7 @@ template <bool Invert> struct Process {
142
144
sleep_briefly ();
143
145
in = load_inbox (lane_mask, index);
144
146
}
145
- __atomic_thread_fence (__ATOMIC_ACQUIRE);
147
+ __scoped_atomic_thread_fence (__ATOMIC_ACQUIRE, __MEMORY_SCOPE_SYSTEM );
146
148
}
147
149
148
150
// / The packet is a linearly allocated array of buffers used to communicate
@@ -162,9 +164,10 @@ template <bool Invert> struct Process {
162
164
163
165
// / Attempt to claim the lock at index. Return true on lock taken.
164
166
// / lane_mask is a bitmap of the threads in the warp that would hold the
165
- // / single lock on success, e.g. the result of gpu ::get_lane_mask()
167
+ // / single lock on success, e.g. the result of rpc ::get_lane_mask()
166
168
// / The lock is held when the n-th bit of the lock bitfield is set.
167
- LIBC_INLINE bool try_lock (uint64_t lane_mask, uint32_t index) {
169
+ [[clang::convergent]] LIBC_INLINE bool try_lock (uint64_t lane_mask,
170
+ uint32_t index) {
168
171
// On amdgpu, test and set to the nth lock bit and a sync_lane would suffice
169
172
// On volta, need to handle differences between the threads running and
170
173
// the threads that were detected in the previous call to get_lane_mask()
@@ -173,12 +176,12 @@ template <bool Invert> struct Process {
173
176
// There may be threads active which are not in lane mask which must not
174
177
// succeed in taking the lock, as otherwise it will leak. This is handled
175
178
// by making threads which are not in lane_mask or with 0, a no-op.
176
- uint32_t id = gpu ::get_lane_id ();
179
+ uint32_t id = rpc ::get_lane_id ();
177
180
bool id_in_lane_mask = lane_mask & (1ul << id);
178
181
179
182
// All threads in the warp call fetch_or. Possibly at the same time.
180
183
bool before = set_nth (lock, index, id_in_lane_mask);
181
- uint64_t packed = gpu ::ballot (lane_mask, before);
184
+ uint64_t packed = rpc ::ballot (lane_mask, before);
182
185
183
186
// If every bit set in lane_mask is also set in packed, every single thread
184
187
// in the warp failed to get the lock. Ballot returns unset for threads not
@@ -198,22 +201,23 @@ template <bool Invert> struct Process {
198
201
// inlining the current function.
199
202
bool holding_lock = lane_mask != packed;
200
203
if (holding_lock)
201
- __atomic_thread_fence (__ATOMIC_ACQUIRE);
204
+ __scoped_atomic_thread_fence (__ATOMIC_ACQUIRE, __MEMORY_SCOPE_DEVICE );
202
205
return holding_lock;
203
206
}
204
207
205
208
// / Unlock the lock at index. We need a lane sync to keep this function
206
209
// / convergent, otherwise the compiler will sink the store and deadlock.
207
- LIBC_INLINE void unlock (uint64_t lane_mask, uint32_t index) {
210
+ [[clang::convergent]] LIBC_INLINE void unlock (uint64_t lane_mask,
211
+ uint32_t index) {
208
212
// Do not move any writes past the unlock.
209
- __atomic_thread_fence (__ATOMIC_RELEASE);
213
+ __scoped_atomic_thread_fence (__ATOMIC_RELEASE, __MEMORY_SCOPE_DEVICE );
210
214
211
215
// Use exactly one thread to clear the nth bit in the lock array Must
212
216
// restrict to a single thread to avoid one thread dropping the lock, then
213
217
// an unrelated warp claiming the lock, then a second thread in this warp
214
218
// dropping the lock again.
215
- clear_nth (lock, index, gpu ::is_first_lane (lane_mask));
216
- gpu ::sync_lane (lane_mask);
219
+ clear_nth (lock, index, rpc ::is_first_lane (lane_mask));
220
+ rpc ::sync_lane (lane_mask);
217
221
}
218
222
219
223
// / Number of bytes to allocate for an inbox or outbox.
@@ -276,9 +280,9 @@ template <typename F>
276
280
LIBC_INLINE static void invoke_rpc (F &&fn, uint32_t lane_size,
277
281
uint64_t lane_mask, Buffer *slot) {
278
282
if constexpr (is_process_gpu ()) {
279
- fn (&slot[gpu ::get_lane_id ()], gpu ::get_lane_id ());
283
+ fn (&slot[rpc ::get_lane_id ()], rpc ::get_lane_id ());
280
284
} else {
281
- for (uint32_t i = 0 ; i < lane_size; i += gpu::get_lane_size ())
285
+ for (uint32_t i = 0 ; i < lane_size; i += rpc::get_num_lanes ())
282
286
if (lane_mask & (1ul << i))
283
287
fn (&slot[i], i);
284
288
}
@@ -323,7 +327,7 @@ template <bool T> struct Port {
323
327
324
328
LIBC_INLINE void close () {
325
329
// Wait for all lanes to finish using the port.
326
- gpu ::sync_lane (lane_mask);
330
+ rpc ::sync_lane (lane_mask);
327
331
328
332
// The server is passive, if it own the buffer when it closes we need to
329
333
// give ownership back to the client.
@@ -466,7 +470,7 @@ LIBC_INLINE void Port<T>::send_n(const void *const *src, uint64_t *size) {
466
470
});
467
471
uint64_t idx = sizeof (Buffer::data) - sizeof (uint64_t );
468
472
uint64_t mask = process.header [index].mask ;
469
- while (gpu ::ballot (mask, idx < num_sends)) {
473
+ while (rpc ::ballot (mask, idx < num_sends)) {
470
474
send ([=](Buffer *buffer, uint32_t id) {
471
475
uint64_t len = lane_value (size, id) - idx > sizeof (Buffer::data)
472
476
? sizeof (Buffer::data)
@@ -499,7 +503,7 @@ LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
499
503
});
500
504
uint64_t idx = sizeof (Buffer::data) - sizeof (uint64_t );
501
505
uint64_t mask = process.header [index].mask ;
502
- while (gpu ::ballot (mask, idx < num_recvs)) {
506
+ while (rpc ::ballot (mask, idx < num_recvs)) {
503
507
recv ([=](Buffer *buffer, uint32_t id) {
504
508
uint64_t len = lane_value (size, id) - idx > sizeof (Buffer::data)
505
509
? sizeof (Buffer::data)
@@ -517,16 +521,17 @@ LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
517
521
// / port. Each port instance uses an associated \p opcode to tell the server
518
522
// / what to do. The Client interface provides the appropriate lane size to the
519
523
// / port using the platform's returned value.
520
- template <uint16_t opcode> LIBC_INLINE Client::Port Client::open () {
524
+ template <uint16_t opcode>
525
+ [[clang::convergent]] LIBC_INLINE Client::Port Client::open () {
521
526
// Repeatedly perform a naive linear scan for a port that can be opened to
522
527
// send data.
523
- for (uint32_t index = gpu::get_cluster_id () ;; ++index) {
528
+ for (uint32_t index = 0 ;; ++index) {
524
529
// Start from the beginning if we run out of ports to check.
525
530
if (index >= process.port_count )
526
531
index = 0 ;
527
532
528
533
// Attempt to acquire the lock on this index.
529
- uint64_t lane_mask = gpu ::get_lane_mask ();
534
+ uint64_t lane_mask = rpc ::get_lane_mask ();
530
535
if (!process.try_lock (lane_mask, index))
531
536
continue ;
532
537
@@ -540,22 +545,22 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
540
545
continue ;
541
546
}
542
547
543
- if (gpu ::is_first_lane (lane_mask)) {
548
+ if (rpc ::is_first_lane (lane_mask)) {
544
549
process.header [index].opcode = opcode;
545
550
process.header [index].mask = lane_mask;
546
551
}
547
- gpu ::sync_lane (lane_mask);
548
- return Port (process, lane_mask, gpu::get_lane_size (), index, out);
552
+ rpc ::sync_lane (lane_mask);
553
+ return Port (process, lane_mask, rpc::get_num_lanes (), index, out);
549
554
}
550
555
}
551
556
552
557
// / Attempts to open a port to use as the server. The server can only open a
553
558
// / port if it has a pending receive operation
554
- LIBC_INLINE cpp::optional<typename Server::Port>
559
+ [[clang::convergent]] LIBC_INLINE cpp::optional<typename Server::Port>
555
560
Server::try_open (uint32_t lane_size, uint32_t start) {
556
561
// Perform a naive linear scan for a port that has a pending request.
557
562
for (uint32_t index = start; index < process.port_count ; ++index) {
558
- uint64_t lane_mask = gpu ::get_lane_mask ();
563
+ uint64_t lane_mask = rpc ::get_lane_mask ();
559
564
uint32_t in = process.load_inbox (lane_mask, index);
560
565
uint32_t out = process.load_outbox (lane_mask, index);
561
566
@@ -595,6 +600,9 @@ LIBC_INLINE Server::Port Server::open(uint32_t lane_size) {
595
600
#undef __scoped_atomic_fetch_or
596
601
#undef __scoped_atomic_fetch_and
597
602
#endif
603
+ #if !__has_builtin(__scoped_atomic_thread_fence)
604
+ #undef __scoped_atomic_thread_fence
605
+ #endif
598
606
599
607
} // namespace rpc
600
608
} // namespace LIBC_NAMESPACE_DECL
0 commit comments