Skip to content

Commit 74ab478

Browse files
committed
[Distributed] Implement func metadata and executeDistributedTarget
dont expose new entrypoints able to get all the way to calling _execute
1 parent d39de7f commit 74ab478

File tree

12 files changed

+834
-128
lines changed

12 files changed

+834
-128
lines changed

include/swift/AST/KnownIdentifiers.def

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ IDENTIFIER(using)
266266
IDENTIFIER(assignID)
267267
IDENTIFIER(resignID)
268268
IDENTIFIER(resolve)
269+
IDENTIFIER(remoteCall)
270+
IDENTIFIER(makeInvocation)
269271
IDENTIFIER(system)
270272
IDENTIFIER(ID)
271273
IDENTIFIER(id)

lib/IRGen/Callee.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ namespace irgen {
176176
AsyncLetGetThrowing,
177177
AsyncLetFinish,
178178
TaskGroupWaitNext,
179+
DistributedExecuteTarget,
179180
};
180181

181182
class Kind {
@@ -221,6 +222,7 @@ namespace irgen {
221222
case SpecialKind::AsyncLetWait:
222223
case SpecialKind::AsyncLetWaitThrowing:
223224
case SpecialKind::TaskGroupWaitNext:
225+
case SpecialKind::DistributedExecuteTarget:
224226
return false;
225227
}
226228

@@ -243,6 +245,7 @@ namespace irgen {
243245
case SpecialKind::AsyncLetGetThrowing:
244246
case SpecialKind::AsyncLetFinish:
245247
case SpecialKind::TaskGroupWaitNext:
248+
case SpecialKind::DistributedExecuteTarget:
246249
return true;
247250
}
248251
llvm_unreachable("covered switch");

lib/IRGen/IRGenSIL.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2572,6 +2572,9 @@ FunctionPointer::Kind irgen::classifyFunctionPointerKind(SILFunction *fn) {
25722572

25732573
if (name.equals("swift_taskGroup_wait_next_throwing"))
25742574
return SpecialKind::TaskGroupWaitNext;
2575+
2576+
if (name.equals("swift_distributed_execute_target"))
2577+
return SpecialKind::DistributedExecuteTarget;
25752578
}
25762579

25772580
return fn->getLoweredFunctionType();

stdlib/public/Concurrency/Actor.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,16 +2056,20 @@ static void ::swift_distributed_execute_target_resume(
20562056

20572057
SWIFT_CC(swiftasync)
20582058
void ::swift_distributed_execute_target(
2059-
SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, DefaultActor *actor,
2060-
const char *targetNameStart, size_t targetNameLength, void *argumentBuffer,
2059+
SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
2060+
DefaultActor *actor,
2061+
const char *targetNameStart, size_t targetNameLength,
2062+
void *argumentBuffer,
20612063
void *resultBuffer) {
20622064
auto *accessor = findDistributedAccessor(targetNameStart, targetNameLength);
2063-
2064-
if (!accessor)
2065+
if (!accessor) {
2066+
assert(false && "no accessor");
20652067
return;
2068+
}
20662069

20672070
auto *asyncFnPtr = reinterpret_cast<
20682071
const AsyncFunctionPointer<DistributedAccessorSignature> *>(accessor);
2072+
assert(asyncFnPtr && "no function pointer for distributed_execute_target");
20692073

20702074
DistributedAccessorSignature::FunctionType *accessorEntry =
20712075
asyncFnPtr->Function.get();

stdlib/public/Distributed/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ add_swift_target_library(swift_Distributed ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
1818
AssertDistributed.swift
1919
DistributedActor.swift
2020
DistributedActorSystem.swift
21+
DistributedMetadata.swift
2122
HeterogeneousBuffer.swift
2223

2324
SWIFT_MODULE_DEPENDS_LINUX Glibc

stdlib/public/Distributed/DistributedActor.swift

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,6 @@ extension DistributedActor {
144144
}
145145
}
146146

147-
/******************************************************************************/
148-
/******************************** Misc ****************************************/
149-
/******************************************************************************/
150-
151-
/// Error protocol to which errors thrown by any `DistributedActorSystem` should conform.
152-
@available(SwiftStdlib 5.6, *)
153-
public protocol DistributedActorSystemError: Error {
154-
}
155-
156-
@available(SwiftStdlib 5.6, *)
157-
public struct DistributedActorCodingError: DistributedActorSystemError {
158-
public let message: String
159-
160-
public init(message: String) {
161-
self.message = message
162-
}
163-
164-
public static func missingActorSystemUserInfo<Act>(_ actorType: Act.Type) -> Self
165-
where Act: DistributedActor {
166-
.init(message: "Missing DistributedActorSystem userInfo while decoding")
167-
}
168-
}
169-
170147
/******************************************************************************/
171148
/************************* Runtime Functions **********************************/
172149
/******************************************************************************/

stdlib/public/Distributed/DistributedActorSystem.swift

Lines changed: 141 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,11 @@ public protocol DistributedActorSystem: Sendable {
126126
// target: RemoteCallTarget,
127127
// arguments: Invocation,
128128
// throwing: Err.Type,
129-
// returning: Res.Type // TODO: to make it `: SerializationRequirement` it'd need to be an ad hoc requirement
129+
// returning: Res.Type
130130
// ) async throws -> Res.Type
131131
// where Act: DistributedActor,
132-
// Act.ID == ActorID
132+
// Act.ID == ActorID,
133+
// Res: SerializationRequirement
133134

134135
}
135136

@@ -152,15 +153,125 @@ extension DistributedActorSystem {
152153
/// The reason for this API using a `ResultHandler` rather than returning values directly,
153154
/// is that thanks to this approach it can avoid any existential boxing, and can serve the most
154155
/// latency sensitive-use-cases.
155-
func executeDistributedTarget<Act, ResultHandler>(
156+
public func executeDistributedTarget<Act, ResultHandler>(
156157
on actor: Act,
157-
mangledMethodName: String,
158+
mangledTargetName: String,
158159
invocation: Self.Invocation,
159160
handler: ResultHandler
160161
) async throws where Act: DistributedActor,
161162
Act.ID == ActorID,
162163
ResultHandler: DistributedTargetInvocationResultHandler {
163-
fatalError("TODO: synthesize and invoke the _executeDistributedTarget")
164+
// NOTE: this implementation is not the most efficient, nor final, version of this func
165+
// we end up demangling the name multiple times, perform more heap allocations than
166+
// we truly need to etc. We'll eventually move this implementation to a specialized one
167+
// avoiding these issues.
168+
169+
guard mangledTargetName.count > 0 && mangledTargetName.first == "$" else {
170+
throw ExecuteDistributedTargetError(
171+
message: "Illegal mangledTargetName detected, must start with '$'")
172+
}
173+
174+
// Get the expected parameter count of the func
175+
let nameUTF8 = Array(mangledTargetName.utf8)
176+
let paramCount = nameUTF8.withUnsafeBufferPointer { nameUTF8 in
177+
__getParameterCount(nameUTF8.baseAddress!, UInt(nameUTF8.endIndex))
178+
}
179+
180+
guard paramCount >= 0 else {
181+
throw ExecuteDistributedTargetError(
182+
message: """
183+
Failed to decode distributed invocation target expected parameter count,
184+
error code: \(paramCount)
185+
mangled name: \(mangledTargetName)
186+
""")
187+
}
188+
189+
// Prepare buffer for the parameter types to be decoded into:
190+
let paramTypesBuffer = UnsafeMutableRawBufferPointer
191+
.allocate(byteCount: MemoryLayout<Any.Type>.size * Int(paramCount),
192+
alignment: MemoryLayout<Any.Type>.alignment)
193+
defer {
194+
paramTypesBuffer.deallocate()
195+
}
196+
197+
// Demangle and write all parameter types into the prepared buffer
198+
let decodedNum = nameUTF8.withUnsafeBufferPointer { nameUTF8 in
199+
__getParameterTypeInfo(
200+
nameUTF8.baseAddress!, UInt(nameUTF8.endIndex),
201+
paramTypesBuffer.baseAddress!._rawValue, Int(paramCount))
202+
}
203+
204+
// Fail if the decoded parameter types count seems off and fishy
205+
guard decodedNum == paramCount else {
206+
throw ExecuteDistributedTargetError(
207+
message: """
208+
Failed to decode the expected number of params of distributed invocation target, error code: \(decodedNum)
209+
(decoded: \(decodedNum), expected params: \(paramCount)
210+
mangled name: \(mangledTargetName)
211+
""")
212+
}
213+
214+
// Copy the types from the buffer into a Swift Array
215+
var paramTypes: [Any.Type] = []
216+
do {
217+
paramTypes.reserveCapacity(Int(decodedNum))
218+
for paramType in paramTypesBuffer.bindMemory(to: Any.Type.self) {
219+
paramTypes.append(paramType)
220+
}
221+
}
222+
223+
// Decode the return type
224+
func allocateReturnTypeBuffer<R>(_: R.Type) -> UnsafeRawPointer? {
225+
if R.self == Void.self {
226+
return nil
227+
}
228+
return UnsafeRawPointer(UnsafeMutablePointer<R>.allocate(capacity: 1))
229+
}
230+
guard let returnType: Any.Type = _getReturnTypeInfo(mangledMethodName: mangledTargetName) else {
231+
throw ExecuteDistributedTargetError(
232+
message: "Failed to decode distributed target return type")
233+
}
234+
let resultBuffer = _openExistential(returnType, do: allocateReturnTypeBuffer)
235+
func destroyReturnTypeBuffer<R>(_: R.Type) {
236+
resultBuffer?.assumingMemoryBound(to: R.self).deallocate()
237+
}
238+
defer {
239+
_openExistential(returnType, do: destroyReturnTypeBuffer)
240+
}
241+
242+
// Prepare the buffer to decode the argument values into
243+
let hargs = HeterogeneousBuffer.allocate(forTypes: paramTypes)
244+
defer {
245+
hargs.deinitialize()
246+
hargs.deallocate()
247+
}
248+
249+
do {
250+
// Execute the target!
251+
print("EXECUTE the target: \(mangledTargetName)")
252+
try await _executeDistributedTarget(
253+
on: actor,
254+
// mangledTargetName, UInt(mangledTargetName.count),
255+
// "\(mangledTargetName)TE", UInt(mangledTargetName.count) + 2,
256+
// "$s4main7GreeterC5helloSSyFTE", UInt("$s4main7GreeterC5helloSSyFTE".count),
257+
"$s4main7GreeterC5helloyyFTE", UInt("$s4main7GreeterC5helloyyFTE".count), // no return value
258+
argumentBuffer: hargs.buffer._rawValue,
259+
resultBuffer: resultBuffer?._rawValue
260+
)
261+
262+
// Get the result out of the buffer and invoke onReturn with the right type
263+
guard let resultBuffer = resultBuffer else {
264+
try await handler.onReturn(value: ())
265+
return
266+
}
267+
func onReturn<R>(_: R.Type) async throws {
268+
try await handler.onReturn/*<R>*/(value: resultBuffer)
269+
}
270+
271+
try await _openExistential(returnType, do: onReturn)
272+
} catch {
273+
try await handler.onThrow(error: error)
274+
}
164275
}
165276
}
166277

@@ -170,74 +281,9 @@ func _executeDistributedTarget(
170281
on actor: AnyObject, // DistributedActor
171282
_ targetName: UnsafePointer<UInt8>, _ targetNameLength: UInt,
172283
argumentBuffer: Builtin.RawPointer, // HeterogeneousBuffer of arguments
173-
resultBuffer: Builtin.RawPointer
284+
resultBuffer: Builtin.RawPointer?
174285
) async throws
175286

176-
// {
177-
//
178-
// // TODO: this entire function has to be implemented in SIL, so the below is pseudo-code...
179-
// // ===== SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL ===== //
180-
// // 1) get the substitutions
181-
// // TODO: we should guarantee the order in which we call those -- so the subs first...
182-
// // meaning that we should also ENCODE the subs first (!)
183-
// let subs: [String] = arguments.genericSubstitutions() // TODO: or do we need a streaming decoder here too???
184-
//
185-
// // 2) find the distributed method
186-
// // TODO: if we can in one call, we would prefer that
187-
// let types: ([Any.Type], Any.Type, Any.Type) = try __getTypeInfo(mangledMethodName, subs)
188-
// let argumentTypes: [Any.Type] = try __getArgumentsTypeInfo(mangledMethodName, subs) // TODO: IRGen???
189-
// // ^^^^^ Some of this will have to be in the runtime
190-
// // - demangling generic function type "given the subs"
191-
// // - produce the concrete type
192-
// // - do these things: checkGenericArguments / the runtimes demangling facilities
193-
// // - _getTypeByMangledNameInContext - TODO: exists but is only for Types, we need functions
194-
// // - or this one _getTypeByMangledNameInEnvironment;
195-
// // - we need some other entrypoint tho, to fold in a call to "checkGenericRequirements" checks `where` in runtime
196-
// // - spits out list of witness tables, which generic func invocation needs
197-
// // - there is a way to do it we think, just not exposed?
198-
//
199-
// // 3) stack-allocate the pre-sized HeterogeneousBuffer
200-
// var hbuf = HeterogeneousBuffer.allocate(forTypes: argumentTypes) // STACK ALLOC THIS
201-
// defer {
202-
// hbuf.deinitialize()
203-
// hbuf.deallocate()
204-
// }
205-
//
206-
// // 4) decode all arguments, we can provide the substituted types to it:
207-
// var decoder = arguments.argumentDecoder()
208-
// var offset = 0
209-
// for AnyT in argumentTypes { // TODO: can this be a loop in SIL or we need to unroll....?
210-
// let OpenT = // SIL: open_existential AnyT
211-
// offset = MemoryLayout<OpenT>.nextAlignedOffset(offset) // ??????
212-
// try decoder.decodeNext(as: OpenT, into: hbuf.pointer(at: offset)) // ??????
213-
// }
214-
//
215-
// // 5) look up the accessor
216-
// let distFuncAccessor = __lookupDistributedFuncAccessor(mangledName: mangledMethodName, substitutions: subs)
217-
//
218-
// // 6) invoke the accessor with the prepared het-buffer
219-
// do {
220-
// // indirect return here:
221-
// let RetType: Any.Type = __getReturnTypeInfo(mangledMethodName: mangledMethodName, subs)
222-
// let OpenedRetType = // $open_existential RetType
223-
// let retBuf: UnsafeMutablePointer<OpenedRetType> = .allocate(MemoryLayout<OpenedRetType>.size)
224-
// defer {
225-
// retBuf.deinitialize()
226-
// retBuf.deallocate()
227-
// }
228-
//
229-
// try await __invokeDistributedFuncAccessor(distFuncAccessor, actor, hbuf.buffer, returningInto: retBuf) // pass as UnsafeMutableRawPointer
230-
//
231-
// handler.onReturn(retBuf.pointee)
232-
// } catch {
233-
// // errors are always indirectly
234-
// handler.onThrow(error)
235-
// }
236-
//
237-
// // ===== SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL SIL ===== //
238-
//
239-
// }
240-
241287
// ==== ----------------------------------------------------------------------------------------------------------------
242288
// MARK: Support types
243289

@@ -374,29 +420,37 @@ public protocol DistributedTargetInvocationArgumentDecoder {
374420

375421
@available(SwiftStdlib 5.6, *)
376422
public protocol DistributedTargetInvocationResultHandler {
423+
// FIXME: these must be ad-hoc protocol requirements, because Res: SerializationRequirement !!!
377424
func onReturn<Res>(value: Res) async throws
378-
func onThrow<Err>(error: Err) async throws where Err: Error
425+
func onThrow<Err: Error>(error: Err) async throws
379426
}
380427

381-
// ==== ----------------------------------------------------------------------------------------------------------------
382-
// MARK: Runtime helper functions
428+
/******************************************************************************/
429+
/******************************** Errors **************************************/
430+
/******************************************************************************/
383431

432+
/// Error protocol to which errors thrown by any `DistributedActorSystem` should conform.
384433
@available(SwiftStdlib 5.6, *)
385-
@_silgen_name("swift_distributed_lookupDistributedFuncAccessor")
386-
func __lookupDistributedFuncAccessor(mangledMethodName: String, subs: [(Int, Int, String)]) -> Builtin.RawPointer
434+
public protocol DistributedActorSystemError: Error {}
387435

388436
@available(SwiftStdlib 5.6, *)
389-
@_silgen_name("swift_distributed_invokeDistributedFuncAccessor")
390-
func __invokeDistributedFuncAccessor(
391-
accessor: Builtin.RawPointer,
392-
actor: AnyObject,
393-
buffer: UnsafeMutableRawPointer, // __owned???
394-
returningInto: UnsafeMutableRawPointer) async throws
437+
public struct ExecuteDistributedTargetError: DistributedActorSystemError {
438+
private let message: String
439+
internal init(message: String) {
440+
self.message = message
441+
}
442+
}
395443

396444
@available(SwiftStdlib 5.6, *)
397-
@_silgen_name("swift_distributed_func_getArgumentsTypeInfo")
398-
func __getArgumentsTypeInfo(mangledMethodName: String, subs: [(Int, Int, String)]) -> [Any.Type]
445+
public struct DistributedActorCodingError: DistributedActorSystemError {
446+
public let message: String
399447

400-
@available(SwiftStdlib 5.6, *)
401-
@_silgen_name("swift_distributed_func_getReturnTypeInfo")
402-
func __getReturnTypeInfo(mangledMethodName: String, subs: [(Int, Int, String)]) -> Any.Type
448+
public init(message: String) {
449+
self.message = message
450+
}
451+
452+
public static func missingActorSystemUserInfo<Act>(_ actorType: Act.Type) -> Self
453+
where Act: DistributedActor {
454+
.init(message: "Missing DistributedActorSystem userInfo while decoding")
455+
}
456+
}

0 commit comments

Comments
 (0)