Skip to content

[Distributed] Offer LocalTestingDistributedActorSystem #41756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 16, 2022
9 changes: 7 additions & 2 deletions stdlib/public/Distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# This source file is part of the Swift.org open source project
#
# Copyright (c) 2019 - 2020 Apple Inc. and the Swift project authors
# Copyright (c) 2019 - 2022 Apple Inc. and the Swift project authors
# Licensed under Apache License v2.0 with Runtime Library Exception
#
# See https://swift.org/LICENSE.txt for license information
Expand All @@ -19,13 +19,18 @@ add_swift_target_library(swift_Distributed ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
DistributedActor.swift
DistributedActorSystem.swift
DistributedMetadata.swift
LocalTestingDistributedActorSystem.swift

SWIFT_MODULE_DEPENDS_IOS Darwin
SWIFT_MODULE_DEPENDS_OSX Darwin
SWIFT_MODULE_DEPENDS_TVOS Darwin
SWIFT_MODULE_DEPENDS_WATCHOS Darwin
SWIFT_MODULE_DEPENDS_LINUX Glibc
SWIFT_MODULE_DEPENDS_FREEBSD Glibc
SWIFT_MODULE_DEPENDS_OPENBSD Glibc
SWIFT_MODULE_DEPENDS_CYGWIN Glibc
SWIFT_MODULE_DEPENDS_HAIKU Glibc
SWIFT_MODULE_DEPENDS_WINDOWS CRT
SWIFT_MODULE_DEPENDS_WINDOWS CRT WinSDK

LINK_LIBRARIES ${swift_distributed_link_libraries}

Expand Down
4 changes: 2 additions & 2 deletions stdlib/public/Distributed/DistributedActorSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public protocol DistributedActorSystem: Sendable {
///
/// The `actor.id` of the passed actor must be an `ActorID` that this system previously has assigned.
///
/// If the `actorReady` gets called with some unknown ID, it should crash immediately as it signifies some
/// If `actorReady` gets called with some unknown ID, it should crash immediately as it signifies some
/// very unexpected use of the system.
///
/// - Parameter actor: reference to the (local) actor that was just fully initialized.
Expand All @@ -93,7 +93,7 @@ public protocol DistributedActorSystem: Sendable {
/// and not re-cycled by the system), i.e. if it is called during a failure to initialize completely,
/// the call from the actor's deinitalizer will not happen (as under these circumstances, `deinit` will be run).
///
/// If the `actorReady` gets called with some unknown ID, it should crash immediately as it signifies some
/// If `resignID` gets called with some unknown ID, it should crash immediately as it signifies some
/// very unexpected use of the system.
///
/// - Parameter id: the id of an actor managed by this system that has begun its `deinit`.
Expand Down
284 changes: 284 additions & 0 deletions stdlib/public/Distributed/LocalTestingDistributedActorSystem.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

import Swift

#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#elseif os(Windows)
import WinSDK
#endif

/// A `DistributedActorSystem` designed for local only testing.
///
/// It will crash on any attempt of remote communication, but can be useful
/// for learning about `distributed actor` isolation, as well as early
/// prototyping stages of development where a real system is not necessary yet.
@available(SwiftStdlib 5.7, *)
public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable {
public typealias ActorID = LocalTestingActorAddress
public typealias InvocationEncoder = LocalTestingInvocationEncoder
public typealias InvocationDecoder = LocalTestingInvocationDecoder
public typealias SerializationRequirement = Codable

private var activeActors: [ActorID: any DistributedActor] = [:]
private let activeActorsLock = _Lock()

private var idProvider: ActorIDProvider = ActorIDProvider()
private var assignedIDs: Set<ActorID> = []
private let assignedIDsLock = _Lock()

public init() {}

public func resolve<Act>(id: ActorID, as actorType: Act.Type)
throws -> Act? where Act: DistributedActor {
guard let anyActor = self.activeActorsLock.withLock({ self.activeActors[id] }) else {
throw LocalTestingDistributedActorSystemError(message: "Unable to locate id '\(id)' locally")
}
guard let actor = anyActor as? Act else {
throw LocalTestingDistributedActorSystemError(message: "Failed to resolve id '\(id)' as \(Act.Type.self)")
}
return actor
}

public func assignID<Act>(_ actorType: Act.Type) -> ActorID
where Act: DistributedActor {
let id = self.idProvider.next()
self.assignedIDsLock.withLock {
self.assignedIDs.insert(id)
}
return id
}

public func actorReady<Act>(_ actor: Act)
where Act: DistributedActor,
Act.ID == ActorID {
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(actor.id) }) else {
fatalError("Attempted to mark an unknown actor '\(actor.id)' ready")
}
self.activeActorsLock.withLock {
self.activeActors[actor.id] = actor
}
}

public func resignID(_ id: ActorID) {
self.activeActorsLock.withLock {
self.activeActors.removeValue(forKey: id)
}
}

public func makeInvocationEncoder() -> InvocationEncoder {
.init()
}

public func remoteCall<Act, Err, Res>(
on actor: Act,
target: RemoteCallTarget,
invocation: inout InvocationEncoder,
throwing errorType: Err.Type,
returning returnType: Res.Type
) async throws -> Res
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error,
Res: SerializationRequirement {
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system")
}

public func remoteCallVoid<Act, Err>(
on actor: Act,
target: RemoteCallTarget,
invocation: inout InvocationEncoder,
throwing errorType: Err.Type
) async throws
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error {
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system")
}

private struct ActorIDProvider {
private var counter: Int = 0
private let counterLock = _Lock()

init() {}

mutating func next() -> LocalTestingActorAddress {
let id: Int = self.counterLock.withLock {
self.counter += 1
return self.counter
}
return LocalTestingActorAddress(parse: "\(id)")
}
}
}

@available(SwiftStdlib 5.7, *)
public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
public let address: String

public init(parse address: String) {
self.address = address
}

public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.address = try container.decode(String.self)
}

public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.address)
}
}

@available(SwiftStdlib 5.7, *)
public struct LocalTestingInvocationEncoder: DistributedTargetInvocationEncoder {
public typealias SerializationRequirement = Codable

public mutating func recordGenericSubstitution<T>(_ type: T.Type) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}

public mutating func recordArgument<Argument: SerializationRequirement>(_ argument: Argument) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}

public mutating func recordErrorType<E: Error>(_ type: E.Type) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}

public mutating func recordReturnType<R: SerializationRequirement>(_ type: R.Type) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}

public mutating func doneRecording() throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}
}

@available(SwiftStdlib 5.7, *)
public final class LocalTestingInvocationDecoder : DistributedTargetInvocationDecoder {
public typealias SerializationRequirement = Codable

public func decodeGenericSubstitutions() throws -> [Any.Type] {
fatalError("Attempted to call decoder method in a local-only actor system")
}

public func decodeNextArgument<Argument: SerializationRequirement>() throws -> Argument {
fatalError("Attempted to call decoder method in a local-only actor system")
}

public func decodeErrorType() throws -> Any.Type? {
fatalError("Attempted to call decoder method in a local-only actor system")
}

public func decodeReturnType() throws -> Any.Type? {
fatalError("Attempted to call decoder method in a local-only actor system")
}
}

// === errors ----------------------------------------------------------------

@available(SwiftStdlib 5.7, *)
public struct LocalTestingDistributedActorSystemError: DistributedActorSystemError {
public let message: String

public init(message: String) {
self.message = message
}
}

// === lock ----------------------------------------------------------------

@available(SwiftStdlib 5.7, *)
fileprivate class _Lock {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
private let underlying: UnsafeMutablePointer<os_unfair_lock>
#elseif os(Windows)
private let underlying: UnsafeMutablePointer<SRWLOCK>
#elseif os(WASI)
// pthread is currently not available on WASI
#elseif os(Cygwin) || os(FreeBSD) || os(OpenBSD)
private let underlying: UnsafeMutablePointer<pthread_mutex_t?>
#else
private let underlying: UnsafeMutablePointer<pthread_mutex_t>
#endif

deinit {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
// `os_unfair_lock`s do not need to be explicitly destroyed
#elseif os(Windows)
// `SRWLOCK`s do not need to be explicitly destroyed
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_destroy(self.underlying) == 0 else {
fatalError("pthread_mutex_destroy failed")
}
#endif

#if !os(WASI)
self.underlying.deinitialize(count: 1)
self.underlying.deallocate()
#endif
}

init() {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
#elseif os(Windows)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
InitializeSRWLock(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
guard pthread_mutex_init(self.underlying, nil) == 0 else {
fatalError("pthread_mutex_init failed")
}
#endif
}

@discardableResult
func withLock<T>(_ body: () -> T) -> T {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
os_unfair_lock_lock(self.underlying)
#elseif os(Windows)
AcquireSRWLockExclusive(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_lock(self.underlying) == 0 else {
fatalError("pthread_mutex_lock failed")
}
#endif

defer {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
os_unfair_lock_unlock(self.underlying)
#elseif os(Windows)
ReleaseSRWLockExclusive(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_unlock(self.underlying) == 0 else {
fatalError("pthread_mutex_unlock failed")
}
#endif
}

return body()
}
}
36 changes: 36 additions & 0 deletions test/Distributed/Runtime/distributed_actor_localSystem.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// RUN: %empty-directory(%t)
// RUN: %target-build-swift -module-name main -Xfrontend -enable-experimental-distributed -Xfrontend -disable-availability-checking -j2 -parse-as-library -I %t %s -o %t/a.out
// RUN: %target-run %t/a.out | %FileCheck %s --color

// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: distributed

// rdar://76038845
// UNSUPPORTED: use_os_stdlib
// UNSUPPORTED: back_deployment_runtime

import Distributed

distributed actor Worker {
typealias ActorSystem = LocalTestingDistributedActorSystem

distributed func hi() {
print("hi!")
}

nonisolated var description: Swift.String {
"Worker(\(id))"
}
}

// ==== Execute ----------------------------------------------------------------
@main struct Main {
static func main() async throws {
let system = LocalTestingDistributedActorSystem()

let actor = Worker(system: system)
try await actor.hi() // local calls should still just work
// CHECK: hi!
}
}