Skip to content

Commit 5066549

Browse files
committed
added S3 Express Scenario
1 parent 29c17a0 commit 5066549

File tree

6 files changed

+460
-197
lines changed

6 files changed

+460
-197
lines changed

javav2/example_code/s3/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@
157157
<groupId>software.amazon.awssdk</groupId>
158158
<artifactId>iam</artifactId>
159159
</dependency>
160+
<dependency>
161+
<groupId>software.amazon.awssdk</groupId>
162+
<artifactId>ec2</artifactId>
163+
</dependency>
160164
<dependency>
161165
<groupId>org.apache.logging.log4j</groupId>
162166
<artifactId>log4j-core</artifactId>

javav2/example_code/s3/src/main/java/com/example/s3/directorybucket/DeleteDirectoryBucketObjects.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,20 @@ public static void deleteDirectoryBucketObjects(S3Client s3Client, String bucket
6969
logger.info("Deleting objects from bucket: {}", bucketName);
7070

7171
try {
72-
// Create a list of ObjectIdentifier
72+
// Create a list of ObjectIdentifier.
7373
List<ObjectIdentifier> identifiers = objectKeys.stream()
7474
.map(key -> ObjectIdentifier.builder().key(key).build())
7575
.toList();
7676

77-
// Create a Delete object
7877
Delete delete = Delete.builder()
7978
.objects(identifiers)
8079
.build();
8180

82-
// Create a DeleteObjectsRequest
8381
DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
8482
.bucket(bucketName)
8583
.delete(delete)
8684
.build();
8785

88-
// Delete the objects
8986
DeleteObjectsResponse deleteObjectsResponse = s3Client.deleteObjects(deleteObjectsRequest);
9087
deleteObjectsResponse.deleted().forEach(deleted -> logger.info("Deleted object: {}", deleted.key()));
9188

javav2/example_code/s3/src/main/java/com/example/s3/express/S3DirectoriesActions.java

Lines changed: 257 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,26 @@
77
import org.slf4j.LoggerFactory;
88
import software.amazon.awssdk.core.ResponseBytes;
99
import software.amazon.awssdk.core.async.AsyncRequestBody;
10+
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
11+
import software.amazon.awssdk.core.retry.RetryMode;
1012
import software.amazon.awssdk.core.waiters.WaiterResponse;
13+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
14+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
15+
import software.amazon.awssdk.regions.Region;
16+
import software.amazon.awssdk.services.ec2.Ec2AsyncClient;
17+
import software.amazon.awssdk.services.ec2.model.AvailabilityZone;
18+
import software.amazon.awssdk.services.ec2.model.CreateVpcEndpointRequest;
19+
import software.amazon.awssdk.services.ec2.model.CreateVpcRequest;
20+
import software.amazon.awssdk.services.ec2.model.DescribeAvailabilityZonesRequest;
21+
import software.amazon.awssdk.services.ec2.model.DescribeRouteTablesRequest;
22+
import software.amazon.awssdk.services.ec2.model.DescribeVpcsRequest;
23+
import software.amazon.awssdk.services.ec2.model.Ec2Exception;
24+
import software.amazon.awssdk.services.ec2.model.Filter;
25+
import software.amazon.awssdk.services.ec2.waiters.Ec2AsyncWaiter;
26+
import software.amazon.awssdk.services.iam.IamAsyncClient;
27+
import software.amazon.awssdk.services.iam.model.CreateAccessKeyRequest;
28+
import software.amazon.awssdk.services.iam.model.CreateAccessKeyResponse;
29+
import software.amazon.awssdk.services.iam.model.IamException;
1130
import software.amazon.awssdk.services.s3.S3AsyncClient;
1231
import software.amazon.awssdk.services.s3.model.BucketInfo;
1332
import software.amazon.awssdk.services.s3.model.BucketType;
@@ -33,15 +52,68 @@
3352
import software.amazon.awssdk.services.s3.model.S3Exception;
3453
import software.amazon.awssdk.services.s3.waiters.S3AsyncWaiter;
3554
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
36-
55+
import java.time.Duration;
56+
import java.util.AbstractMap;
3757
import java.util.List;
58+
import java.util.Scanner;
3859
import java.util.concurrent.CompletableFuture;
3960
import java.util.concurrent.CompletionException;
4061
import java.util.stream.Collectors;
62+
import java.util.stream.IntStream;
4163

4264
public class S3DirectoriesActions {
65+
66+
private static IamAsyncClient iamAsyncClient;
67+
68+
private static Ec2AsyncClient ec2AsyncClient;
4369
private static final Logger logger = LoggerFactory.getLogger(S3DirectoriesActions.class);
4470

71+
private static IamAsyncClient getIAMAsyncClient() {
72+
if (iamAsyncClient == null) {
73+
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
74+
.maxConcurrency(100)
75+
.connectionTimeout(Duration.ofSeconds(60))
76+
.readTimeout(Duration.ofSeconds(60))
77+
.writeTimeout(Duration.ofSeconds(60))
78+
.build();
79+
80+
ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
81+
.apiCallTimeout(Duration.ofMinutes(2))
82+
.apiCallAttemptTimeout(Duration.ofSeconds(90))
83+
.retryStrategy(RetryMode.STANDARD)
84+
.build();
85+
86+
iamAsyncClient = IamAsyncClient.builder()
87+
.httpClient(httpClient)
88+
.overrideConfiguration(overrideConfig)
89+
.build();
90+
}
91+
return iamAsyncClient;
92+
}
93+
94+
private static Ec2AsyncClient getEc2AsyncClient() {
95+
if (ec2AsyncClient == null) {
96+
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
97+
.maxConcurrency(100)
98+
.connectionTimeout(Duration.ofSeconds(60))
99+
.readTimeout(Duration.ofSeconds(60))
100+
.writeTimeout(Duration.ofSeconds(60))
101+
.build();
102+
103+
ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
104+
.apiCallTimeout(Duration.ofMinutes(2))
105+
.apiCallAttemptTimeout(Duration.ofSeconds(90))
106+
.retryStrategy(RetryMode.STANDARD)
107+
.build();
108+
109+
ec2AsyncClient = Ec2AsyncClient.builder()
110+
.httpClient(httpClient)
111+
.region(Region.US_WEST_2)
112+
.overrideConfiguration(overrideConfig)
113+
.build();
114+
}
115+
return ec2AsyncClient;
116+
}
45117

46118
/**
47119
* Deletes the specified S3 bucket and all the objects within it in an asynchronous manner.
@@ -102,7 +174,6 @@ public CompletableFuture<WaiterResponse<HeadBucketResponse>> deleteBucketAndObje
102174
});
103175
}
104176

105-
106177
/**
107178
* Lists the objects in an S3 bucket asynchronously using the AWS SDK.
108179
*
@@ -127,7 +198,6 @@ public CompletableFuture<List<String>> listObjectsAsync(S3AsyncClient s3Client,
127198
}
128199

129200
public CompletableFuture<ResponseBytes<GetObjectResponse>> getObjectAsync(S3AsyncClient s3Client, String bucketName, String keyName) {
130-
// Create the GetObjectRequest for the asynchronous client
131201
GetObjectRequest objectRequest = GetObjectRequest.builder()
132202
.key(keyName)
133203
.bucket(bucketName)
@@ -213,7 +283,7 @@ public CompletableFuture<Void> createSessionAsync(S3AsyncClient s3Client, String
213283
* @param zone The region where the bucket will be created
214284
* @throws S3Exception if there's an error creating the bucket
215285
*/
216-
public static CompletableFuture<Void> createDirectoryBucketAsync(S3AsyncClient s3Client, String bucketName, String zone) {
286+
public CompletableFuture<Void> createDirectoryBucketAsync(S3AsyncClient s3Client, String bucketName, String zone) {
217287
logger.info("Creating bucket: " + bucketName);
218288

219289
CreateBucketConfiguration bucketConfiguration = CreateBucketConfiguration.builder()
@@ -295,4 +365,187 @@ public CompletableFuture<PutObjectResponse> putObjectAsync(S3AsyncClient s3Clien
295365
}
296366
});
297367
}
368+
369+
/**
370+
* Creates an AWS IAM access key asynchronously for the specified user name.
371+
*
372+
* @param userName the name of the IAM user for whom to create the access key
373+
* @return a {@link CompletableFuture} that completes with the {@link CreateAccessKeyResponse} containing the created access key
374+
*/
375+
public CompletableFuture<CreateAccessKeyResponse> createAccessKeyAsync(String userName) {
376+
CreateAccessKeyRequest request = CreateAccessKeyRequest.builder()
377+
.userName(userName)
378+
.build();
379+
380+
return getIAMAsyncClient().createAccessKey(request)
381+
.whenComplete((response, exception) -> {
382+
if (response != null) {
383+
logger.info("Access Key Created.");
384+
} else {
385+
if (exception == null) {
386+
throw new CompletionException("An unknown error occurred while creating access key.", null);
387+
}
388+
389+
Throwable cause = exception.getCause();
390+
if (cause instanceof IamException) {
391+
throw new CompletionException("IAM error while creating access key: " + cause.getMessage(), cause);
392+
}
393+
394+
throw new CompletionException("Failed to create access key: " + exception.getMessage(), exception);
395+
}
396+
});
397+
}
398+
399+
/**
400+
* Selects an availability zone ID based on the specified AWS region.
401+
*
402+
* @return A map containing the selected availability zone details, including the zone name, zone ID, region name, and state.
403+
*/
404+
public CompletableFuture<String> selectAvailabilityZoneIdAsync() {
405+
// Request available zones
406+
DescribeAvailabilityZonesRequest zonesRequest = DescribeAvailabilityZonesRequest.builder()
407+
.build();
408+
409+
return getEc2AsyncClient().describeAvailabilityZones(zonesRequest)
410+
.thenCompose(response -> {
411+
List<AvailabilityZone> zonesList = response.availabilityZones();
412+
413+
if (zonesList.isEmpty()) {
414+
logger.info("No availability zones found.");
415+
return CompletableFuture.completedFuture(null); // Return null if no zones are found
416+
}
417+
418+
// Extract zone IDs
419+
List<String> zoneIds = zonesList.stream()
420+
.map(AvailabilityZone::zoneId) // Get the zoneId (e.g., "usw2-az1")
421+
.toList();
422+
423+
// **Prompt user synchronously** and return CompletableFuture
424+
return CompletableFuture.supplyAsync(() -> promptUserForZoneSelection(zonesList, zoneIds))
425+
.thenApply(selectedZone -> {
426+
// Return only the selected Zone ID (e.g., "usw2-az1")
427+
return selectedZone.zoneId();
428+
});
429+
})
430+
.whenComplete((result, exception) -> {
431+
if (exception == null) {
432+
if (result != null) {
433+
logger.info("Selected Availability Zone ID: " + result);
434+
} else {
435+
logger.info("No availability zone selected.");
436+
}
437+
} else {
438+
Throwable cause = exception.getCause();
439+
if (cause instanceof Ec2Exception) {
440+
throw new CompletionException("EC2 error while selecting availability zone: " + cause.getMessage(), cause);
441+
}
442+
throw new CompletionException("Failed to select availability zone: " + exception.getMessage(), exception);
443+
}
444+
});
445+
}
446+
447+
/**
448+
* Prompts the user to select an availability zone from the given list.
449+
*
450+
* @param zonesList the list of availability zones
451+
* @param zoneIds the list of zone IDs
452+
* @return the selected AvailabilityZone
453+
*/
454+
private static AvailabilityZone promptUserForZoneSelection(List<AvailabilityZone> zonesList, List<String> zoneIds) {
455+
Scanner scanner = new Scanner(System.in);
456+
int index = -1;
457+
458+
while (index < 0 || index >= zoneIds.size()) {
459+
logger.info("Select an availability zone:");
460+
IntStream.range(0, zoneIds.size()).forEach(i ->
461+
System.out.println(i + ": " + zoneIds.get(i)) // Display Zone IDs
462+
);
463+
464+
logger.info("Enter the number corresponding to your choice: ");
465+
if (scanner.hasNextInt()) {
466+
index = scanner.nextInt();
467+
} else {
468+
scanner.next(); // Consume invalid input
469+
}
470+
}
471+
472+
AvailabilityZone selectedZone = zonesList.get(index);
473+
logger.info("You selected: " + selectedZone.zoneId()); // Log Zone ID
474+
return selectedZone;
475+
}
476+
public CompletableFuture<Void> setupVPCAsync() {
477+
String cidr = "10.0.0.0/16";
478+
CreateVpcRequest vpcRequest = CreateVpcRequest.builder()
479+
.cidrBlock(cidr)
480+
.build();
481+
482+
return getEc2AsyncClient().createVpc(vpcRequest)
483+
.thenCompose(vpcResponse -> {
484+
String vpcId = vpcResponse.vpc().vpcId();
485+
486+
// Wait for VPC to be available
487+
Ec2AsyncWaiter waiter = ec2AsyncClient.waiter();
488+
DescribeVpcsRequest request = DescribeVpcsRequest.builder()
489+
.vpcIds(vpcId)
490+
.build();
491+
492+
return waiter.waitUntilVpcAvailable(request)
493+
.thenApply(waiterResponse -> vpcId);
494+
})
495+
.thenCompose(vpcId -> {
496+
// Fetch route table for VPC
497+
Filter filter = Filter.builder()
498+
.name("vpc-id")
499+
.values(vpcId)
500+
.build();
501+
502+
DescribeRouteTablesRequest describeRouteTablesRequest = DescribeRouteTablesRequest.builder()
503+
.filters(filter)
504+
.build();
505+
506+
return ec2AsyncClient.describeRouteTables(describeRouteTablesRequest)
507+
.thenApply(routeTablesResponse -> {
508+
if (routeTablesResponse.routeTables().isEmpty()) {
509+
throw new CompletionException("No route tables found for VPC.", null);
510+
}
511+
return new AbstractMap.SimpleEntry<>(vpcId, routeTablesResponse.routeTables().get(0).routeTableId());
512+
});
513+
})
514+
.thenCompose(vpcAndRouteTable -> {
515+
String vpcId = vpcAndRouteTable.getKey();
516+
String routeTableId = vpcAndRouteTable.getValue();
517+
Region region = ec2AsyncClient.serviceClientConfiguration().region();
518+
String serviceName = String.format("com.amazonaws.%s.s3express", region.id());
519+
520+
CreateVpcEndpointRequest endpointRequest = CreateVpcEndpointRequest.builder()
521+
.vpcId(vpcId)
522+
.routeTableIds(routeTableId)
523+
.serviceName(serviceName)
524+
.build();
525+
526+
return ec2AsyncClient.createVpcEndpoint(endpointRequest)
527+
.thenApply(vpcEndpointResponse -> {
528+
String vpcEndpointId = vpcEndpointResponse.vpcEndpoint().vpcEndpointId();
529+
return new AbstractMap.SimpleEntry<>(vpcId, vpcEndpointId);
530+
});
531+
})
532+
.whenComplete((result, exception) -> {
533+
if (result != null) {
534+
logger.info("Created VPC: {}", result.getKey());
535+
logger.info("Created VPC Endpoint: {}", result.getValue());
536+
} else {
537+
if (exception == null) {
538+
throw new CompletionException("An unknown error occurred during VPC setup.", null);
539+
}
540+
541+
Throwable cause = exception.getCause();
542+
if (cause instanceof Ec2Exception) {
543+
throw new CompletionException("EC2 error during VPC setup: " + cause.getMessage(), cause);
544+
}
545+
546+
throw new CompletionException("VPC setup failed: " + exception.getMessage(), exception);
547+
}
548+
})
549+
.thenAccept(v -> {}); // Ensure CompletableFuture<Void> return type
550+
}
298551
}

0 commit comments

Comments
 (0)