-
Notifications
You must be signed in to change notification settings - Fork 10.5k
[Distributed] Offer LocalTestingDistributedActorSystem #41756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
swift-ci
merged 12 commits into
swiftlang:main
from
yim-lee:distributed/loccal-actor-system
Mar 16, 2022
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
bb90140
Offer LocalTestingDistributedActorSystem
yim-lee 97da205
Fix build errors
yim-lee 290506d
Add LocalTestingActorAddress
yim-lee 1dd0604
Don't print
yim-lee 2e5d4b6
Fix build errors
yim-lee c5254bb
Add @available
yim-lee a5adf26
Workaround swift-syntax-test bug
yim-lee 2adcd72
Make thread safe
yim-lee 143914a
Use os_unfair_lock on Apple platforms
yim-lee b001d5b
swift-syntax-test bug fixed; remove workaround
yim-lee 8a1c513
add test and availability annotations
ktoso 4c09b91
update Distributed module import
ktoso File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
284 changes: 284 additions & 0 deletions
284
stdlib/public/Distributed/LocalTestingDistributedActorSystem.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,284 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Swift.org open source project | ||
// | ||
// Copyright (c) 2022 Apple Inc. and the Swift project authors | ||
// Licensed under Apache License v2.0 with Runtime Library Exception | ||
// | ||
// See https://swift.org/LICENSE.txt for license information | ||
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import Swift | ||
|
||
#if canImport(Darwin) | ||
import Darwin | ||
#elseif canImport(Glibc) | ||
import Glibc | ||
#elseif os(Windows) | ||
import WinSDK | ||
#endif | ||
|
||
/// A `DistributedActorSystem` designed for local only testing. | ||
/// | ||
/// It will crash on any attempt of remote communication, but can be useful | ||
/// for learning about `distributed actor` isolation, as well as early | ||
/// prototyping stages of development where a real system is not necessary yet. | ||
@available(SwiftStdlib 5.7, *) | ||
public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable { | ||
public typealias ActorID = LocalTestingActorAddress | ||
public typealias InvocationEncoder = LocalTestingInvocationEncoder | ||
public typealias InvocationDecoder = LocalTestingInvocationDecoder | ||
public typealias SerializationRequirement = Codable | ||
|
||
private var activeActors: [ActorID: any DistributedActor] = [:] | ||
private let activeActorsLock = _Lock() | ||
|
||
private var idProvider: ActorIDProvider = ActorIDProvider() | ||
private var assignedIDs: Set<ActorID> = [] | ||
private let assignedIDsLock = _Lock() | ||
|
||
public init() {} | ||
|
||
public func resolve<Act>(id: ActorID, as actorType: Act.Type) | ||
throws -> Act? where Act: DistributedActor { | ||
guard let anyActor = self.activeActorsLock.withLock({ self.activeActors[id] }) else { | ||
throw LocalTestingDistributedActorSystemError(message: "Unable to locate id '\(id)' locally") | ||
} | ||
guard let actor = anyActor as? Act else { | ||
throw LocalTestingDistributedActorSystemError(message: "Failed to resolve id '\(id)' as \(Act.Type.self)") | ||
} | ||
return actor | ||
} | ||
ktoso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
public func assignID<Act>(_ actorType: Act.Type) -> ActorID | ||
where Act: DistributedActor { | ||
let id = self.idProvider.next() | ||
self.assignedIDsLock.withLock { | ||
self.assignedIDs.insert(id) | ||
} | ||
return id | ||
} | ||
|
||
public func actorReady<Act>(_ actor: Act) | ||
where Act: DistributedActor, | ||
Act.ID == ActorID { | ||
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(actor.id) }) else { | ||
fatalError("Attempted to mark an unknown actor '\(actor.id)' ready") | ||
} | ||
self.activeActorsLock.withLock { | ||
self.activeActors[actor.id] = actor | ||
} | ||
} | ||
|
||
public func resignID(_ id: ActorID) { | ||
self.activeActorsLock.withLock { | ||
self.activeActors.removeValue(forKey: id) | ||
} | ||
} | ||
|
||
public func makeInvocationEncoder() -> InvocationEncoder { | ||
.init() | ||
} | ||
|
||
public func remoteCall<Act, Err, Res>( | ||
on actor: Act, | ||
target: RemoteCallTarget, | ||
invocation: inout InvocationEncoder, | ||
throwing errorType: Err.Type, | ||
returning returnType: Res.Type | ||
) async throws -> Res | ||
where Act: DistributedActor, | ||
Act.ID == ActorID, | ||
Err: Error, | ||
Res: SerializationRequirement { | ||
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system") | ||
} | ||
|
||
public func remoteCallVoid<Act, Err>( | ||
on actor: Act, | ||
target: RemoteCallTarget, | ||
invocation: inout InvocationEncoder, | ||
throwing errorType: Err.Type | ||
) async throws | ||
where Act: DistributedActor, | ||
Act.ID == ActorID, | ||
Err: Error { | ||
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system") | ||
} | ||
|
||
private struct ActorIDProvider { | ||
private var counter: Int = 0 | ||
private let counterLock = _Lock() | ||
|
||
init() {} | ||
|
||
mutating func next() -> LocalTestingActorAddress { | ||
let id: Int = self.counterLock.withLock { | ||
self.counter += 1 | ||
return self.counter | ||
} | ||
return LocalTestingActorAddress(parse: "\(id)") | ||
} | ||
} | ||
} | ||
|
||
@available(SwiftStdlib 5.7, *) | ||
public struct LocalTestingActorAddress: Hashable, Sendable, Codable { | ||
public let address: String | ||
|
||
public init(parse address: String) { | ||
self.address = address | ||
} | ||
|
||
public init(from decoder: Decoder) throws { | ||
let container = try decoder.singleValueContainer() | ||
self.address = try container.decode(String.self) | ||
} | ||
|
||
public func encode(to encoder: Encoder) throws { | ||
var container = encoder.singleValueContainer() | ||
try container.encode(self.address) | ||
} | ||
} | ||
|
||
@available(SwiftStdlib 5.7, *) | ||
public struct LocalTestingInvocationEncoder: DistributedTargetInvocationEncoder { | ||
public typealias SerializationRequirement = Codable | ||
|
||
public mutating func recordGenericSubstitution<T>(_ type: T.Type) throws { | ||
fatalError("Attempted to call encoder method in a local-only actor system") | ||
} | ||
|
||
public mutating func recordArgument<Argument: SerializationRequirement>(_ argument: Argument) throws { | ||
fatalError("Attempted to call encoder method in a local-only actor system") | ||
} | ||
|
||
public mutating func recordErrorType<E: Error>(_ type: E.Type) throws { | ||
fatalError("Attempted to call encoder method in a local-only actor system") | ||
} | ||
|
||
public mutating func recordReturnType<R: SerializationRequirement>(_ type: R.Type) throws { | ||
fatalError("Attempted to call encoder method in a local-only actor system") | ||
} | ||
|
||
public mutating func doneRecording() throws { | ||
fatalError("Attempted to call encoder method in a local-only actor system") | ||
} | ||
} | ||
|
||
@available(SwiftStdlib 5.7, *) | ||
public final class LocalTestingInvocationDecoder : DistributedTargetInvocationDecoder { | ||
public typealias SerializationRequirement = Codable | ||
|
||
public func decodeGenericSubstitutions() throws -> [Any.Type] { | ||
fatalError("Attempted to call decoder method in a local-only actor system") | ||
} | ||
|
||
public func decodeNextArgument<Argument: SerializationRequirement>() throws -> Argument { | ||
fatalError("Attempted to call decoder method in a local-only actor system") | ||
} | ||
|
||
public func decodeErrorType() throws -> Any.Type? { | ||
fatalError("Attempted to call decoder method in a local-only actor system") | ||
} | ||
|
||
public func decodeReturnType() throws -> Any.Type? { | ||
fatalError("Attempted to call decoder method in a local-only actor system") | ||
} | ||
} | ||
|
||
// === errors ---------------------------------------------------------------- | ||
|
||
@available(SwiftStdlib 5.7, *) | ||
public struct LocalTestingDistributedActorSystemError: DistributedActorSystemError { | ||
public let message: String | ||
|
||
public init(message: String) { | ||
self.message = message | ||
} | ||
} | ||
|
||
// === lock ---------------------------------------------------------------- | ||
|
||
@available(SwiftStdlib 5.7, *) | ||
fileprivate class _Lock { | ||
ktoso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS) | ||
private let underlying: UnsafeMutablePointer<os_unfair_lock> | ||
#elseif os(Windows) | ||
private let underlying: UnsafeMutablePointer<SRWLOCK> | ||
#elseif os(WASI) | ||
// pthread is currently not available on WASI | ||
#elseif os(Cygwin) || os(FreeBSD) || os(OpenBSD) | ||
private let underlying: UnsafeMutablePointer<pthread_mutex_t?> | ||
#else | ||
private let underlying: UnsafeMutablePointer<pthread_mutex_t> | ||
#endif | ||
|
||
deinit { | ||
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS) | ||
// `os_unfair_lock`s do not need to be explicitly destroyed | ||
#elseif os(Windows) | ||
// `SRWLOCK`s do not need to be explicitly destroyed | ||
#elseif os(WASI) | ||
// WASI environment has only a single thread | ||
#else | ||
guard pthread_mutex_destroy(self.underlying) == 0 else { | ||
fatalError("pthread_mutex_destroy failed") | ||
} | ||
#endif | ||
|
||
#if !os(WASI) | ||
self.underlying.deinitialize(count: 1) | ||
self.underlying.deallocate() | ||
#endif | ||
} | ||
|
||
init() { | ||
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS) | ||
self.underlying = UnsafeMutablePointer.allocate(capacity: 1) | ||
#elseif os(Windows) | ||
self.underlying = UnsafeMutablePointer.allocate(capacity: 1) | ||
InitializeSRWLock(self.underlying) | ||
#elseif os(WASI) | ||
// WASI environment has only a single thread | ||
#else | ||
self.underlying = UnsafeMutablePointer.allocate(capacity: 1) | ||
guard pthread_mutex_init(self.underlying, nil) == 0 else { | ||
fatalError("pthread_mutex_init failed") | ||
} | ||
#endif | ||
} | ||
|
||
@discardableResult | ||
func withLock<T>(_ body: () -> T) -> T { | ||
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS) | ||
os_unfair_lock_lock(self.underlying) | ||
#elseif os(Windows) | ||
AcquireSRWLockExclusive(self.underlying) | ||
#elseif os(WASI) | ||
// WASI environment has only a single thread | ||
#else | ||
guard pthread_mutex_lock(self.underlying) == 0 else { | ||
fatalError("pthread_mutex_lock failed") | ||
} | ||
#endif | ||
|
||
defer { | ||
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS) | ||
os_unfair_lock_unlock(self.underlying) | ||
#elseif os(Windows) | ||
ReleaseSRWLockExclusive(self.underlying) | ||
#elseif os(WASI) | ||
// WASI environment has only a single thread | ||
#else | ||
guard pthread_mutex_unlock(self.underlying) == 0 else { | ||
fatalError("pthread_mutex_unlock failed") | ||
} | ||
#endif | ||
} | ||
|
||
return body() | ||
} | ||
} |
36 changes: 36 additions & 0 deletions
36
test/Distributed/Runtime/distributed_actor_localSystem.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
// RUN: %empty-directory(%t) | ||
// RUN: %target-build-swift -module-name main -Xfrontend -enable-experimental-distributed -Xfrontend -disable-availability-checking -j2 -parse-as-library -I %t %s -o %t/a.out | ||
// RUN: %target-run %t/a.out | %FileCheck %s --color | ||
|
||
// REQUIRES: executable_test | ||
// REQUIRES: concurrency | ||
// REQUIRES: distributed | ||
|
||
// rdar://76038845 | ||
// UNSUPPORTED: use_os_stdlib | ||
// UNSUPPORTED: back_deployment_runtime | ||
|
||
import Distributed | ||
|
||
distributed actor Worker { | ||
typealias ActorSystem = LocalTestingDistributedActorSystem | ||
|
||
distributed func hi() { | ||
print("hi!") | ||
} | ||
|
||
nonisolated var description: Swift.String { | ||
"Worker(\(id))" | ||
} | ||
} | ||
|
||
// ==== Execute ---------------------------------------------------------------- | ||
@main struct Main { | ||
static func main() async throws { | ||
let system = LocalTestingDistributedActorSystem() | ||
|
||
let actor = Worker(system: system) | ||
try await actor.hi() // local calls should still just work | ||
// CHECK: hi! | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.