Skip to content

Commit 3aff5d5

Browse files
committed
New tutorial to copy data using python
1 parent 7be1e12 commit 3aff5d5

File tree

4 files changed

+342
-0
lines changed

4 files changed

+342
-0
lines changed
Loading
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
---
2+
parser: v2
3+
author_name: Allyson Sherwood
4+
author_profile: https://github.com/allysonsherwood
5+
auto_validation: true
6+
time: 15
7+
tags: [ tutorial>beginner, software-product-function>sap-hana-cloud\,-data-lake, software-product>sap-hana-cloud, programming-tool>python]
8+
primary_tag: software-product-function>sap-hana-cloud\,-data-lake
9+
---
10+
11+
# Copy Data Between Data Lake Files Instances Using Python
12+
<!-- description --> Copy a root directory from a source SAP HANA data lake Files instance to a target SAP HANA data lake files instance. A Python script is provided to execute this copy which leverages the SAP HANA Cloud, data lake Files REST API.
13+
14+
## Prerequisites
15+
- Two running non-trial SAP HANA data lake Files instances – a source and a target
16+
- Both instances added to SAP HANA database explorer; [instructions to add](data-lake-hdlfs-dbx-connection)
17+
- Read permissions on the source instance
18+
- Read and write permissions on the target instance
19+
- Client certificates set up on both instances, and a copy of the client certificate and client key files for each instance; [instructions to set up certificates](https://blogs.sap.com/2021/08/05/setting-up-initial-access-to-hana-cloud-data-lake-files/)
20+
- Python 3.10 or later; [download Python](https://www.python.org/downloads/)
21+
22+
## You will learn
23+
- How to copy a directory from the source data lake file container to the target data lake file container
24+
25+
## Intro
26+
Note that for simplicity, the same client certificate will be used for both the target and the source instance in this tutorial.
27+
28+
---
29+
30+
### Set up a directory in the source data lake Files instance
31+
32+
In this step, we will create a directory called `My_Directory` with two subdirectories, `Subdirectory_1` and `Subdirectory_2`, with a file in each subdirectory. The files can be of any format, however this tutorial will use text files.
33+
34+
1. Create and save the following text files locally.
35+
36+
```File_1.txt
37+
This is my first text file.
38+
```
39+
Ensure to recall the location of your text files.
40+
41+
```File_2.txt
42+
This is my second text file.
43+
```
44+
45+
2. In database explorer, upload these files to your source HDLFS instance. Upon uploading the file with the following relative path, the directory `My_Directory` with subdirectory `Subdirectory_1` will be created.
46+
47+
![Upload files](upload-files.png)
48+
49+
![Upload first file](upload-file-1.png)
50+
51+
3. Upload `File_2.txt` using the relative path `My_Directory/Subdirectory_2`.
52+
53+
54+
### Set up a Python script
55+
56+
57+
1. In this step you will need to access the REST API endpoints for both your source and target HLDFS instances. The REST API endpoint for a given instance can be found by clicking the action menu in SAP HANA Cloud Central.
58+
59+
![Copy REST API endpoint](REST-endpoint.png)
60+
61+
2. Create a Python script beginning with the code below and save the file as `copy_HDLFS.py`. Edit the source and target instance variables with the appropriate REST API endpoints for each of your containers. Edit the certificate variables with the appropriate path to the certificate and key used for both HDLFS containers.
62+
63+
```Python
64+
# Source information
65+
SOURCE_FILES_REST_API_ENDPOINT = '<File Container REST API>'
66+
67+
# Target information
68+
TARGET_FILES_REST_API_ENDPOINT = '<File Container REST API>'
69+
70+
# Certificates information
71+
CERTIFICATE_PATH = '<Path to client.crt>'
72+
KEY_PATH = '<Path to client.key>'
73+
```
74+
75+
### Set up a copy script
76+
77+
1. Append the following code to the end of your `copy_HDLFS.py file`
78+
79+
```Python
80+
# This script copies a directory including all subdirectories and files from a root directory
81+
# in the source HDLFS instance to the target HDLFS instance.
82+
83+
# This script is run with the following arguments:
84+
# root='root_dir_name'
85+
# where root_dir_name is the name of the root directory in the source instance
86+
# that is being copied
87+
# index=i
88+
# (optional) where i is a non-negative integer and the index of the file in the source
89+
# instance that will be used as a starting point for the copy -- in other words, the first i
90+
# files will be skipped and thus will not be copied; if no value is given, the default index 0
91+
# will be used -- all files will be copied
92+
#
93+
# Ex. The following arguments would execute a copy from root directory 'TPCH_SF100' starting at the
94+
# file at index 42
95+
# py copy_HDLFS.py root='TPCH_SF100' index=42
96+
97+
###################################################################################################
98+
99+
# Importing dependencies
100+
import http.client
101+
import json
102+
from datetime import datetime
103+
import ssl
104+
import sys
105+
106+
###################################################################################################
107+
108+
# Handling arguments if either have been provided
109+
# In either order, root and index can be specified by the user in the following format:
110+
# py copy_HDLFS.py root='TPCH_SF100' index=42
111+
def assign_arguments(arg_list):
112+
args = {}
113+
if len(arg_list) <= 3:
114+
for arg in arg_list:
115+
if arg[:6] == 'root=\'' and arg[-1] == '\'' and not ('root' in args):
116+
args['root'] = arg[6:-1]
117+
elif arg[:6] == 'index=' and not ('index' in args):
118+
try:
119+
args['index'] = int(arg[6:])
120+
args['index'] >= 0
121+
except:
122+
raise Exception(f'ERROR: Invalid argument {arg}.')
123+
else:
124+
raise Exception(f'ERROR: Invalid argument {arg}.')
125+
else:
126+
raise Exception('ERROR: Too many arguments.')
127+
return args
128+
129+
argument_assignment = assign_arguments(sys.argv[1:])
130+
if 'root' in argument_assignment:
131+
ROOT_DIR = argument_assignment['root']
132+
else:
133+
raise Exception('ERROR: No root directory was provided. To copy the entire source instance use root=\'\'.')
134+
if 'index' in argument_assignment:
135+
STARTING_INDEX = argument_assignment['index']
136+
else:
137+
STARTING_INDEX = 0
138+
139+
###################################################################################################
140+
141+
# Creating an SSL context using the certificate path and key path variables
142+
ssl_context = ssl.create_default_context()
143+
ssl_context.load_cert_chain(CERTIFICATE_PATH, KEY_PATH)
144+
145+
# Creating container variables for the source and target instances
146+
source_container = SOURCE_FILES_REST_API_ENDPOINT.partition('.')[0]
147+
target_container = TARGET_FILES_REST_API_ENDPOINT.partition('.')[0]
148+
149+
# Creating connections to the source and target instances
150+
source_connection = http.client.HTTPSConnection(
151+
SOURCE_FILES_REST_API_ENDPOINT, port=443, timeout=30, context=ssl_context)
152+
target_connection = http.client.HTTPSConnection(
153+
TARGET_FILES_REST_API_ENDPOINT, port=443, timeout=30, context=ssl_context)
154+
155+
# Creating JSON request variables needed to access the present source and target HDLFS directories
156+
# at the root directory provided
157+
json_request_path = f'/{ROOT_DIR}'
158+
json_request_url = f'/webhdfs/v1/{json_request_path}?op=LISTSTATUS_RECURSIVE'
159+
source_json_request_headers = {
160+
'x-sap-filecontainer': source_container,
161+
'Content-Type': 'application/json'
162+
}
163+
target_json_request_headers = {
164+
'x-sap-filecontainer': target_container,
165+
'Content-Type': 'application/json'
166+
}
167+
168+
# Creating request headers for reading and writing binary data from the source and target HDLFS
169+
# directories
170+
source_request_headers = {
171+
'x-sap-filecontainer': source_container,
172+
'Content-Type': 'application/octet-stream'
173+
}
174+
target_request_headers = {
175+
'x-sap-filecontainer': target_container,
176+
'Content-Type': 'application/octet-stream'
177+
}
178+
179+
# http.client connection requests are made and if the request is successful, the read data
180+
# is returned
181+
def fetch(fetch_connection, fetch_method, fetch_url, fetch_body, fetch_headers):
182+
fetch_connection.request(
183+
method = fetch_method,
184+
url = fetch_url,
185+
body = fetch_body,
186+
headers = fetch_headers)
187+
response = fetch_connection.getresponse()
188+
data = response.read()
189+
response.close()
190+
return data
191+
192+
###################################################################################################
193+
194+
# Connecting to the target instance and requesting a list of the current target HDLFS directory at
195+
# the root directory provided
196+
# If connection is unsuccessful the http.client will raise an exception
197+
print('\nConnecting to target instance...')
198+
target_json_data = fetch(target_connection, 'GET', json_request_url,
199+
None, target_json_request_headers)
200+
target_files_dict = json.loads(target_json_data)
201+
print('Successfully connected to target instance.\n')
202+
203+
# If the root directory already exists in the target instance, the user is prompted to confirm that
204+
# they would like to proceed
205+
if 'DirectoryListing' in target_files_dict:
206+
print(f'WARNING: The directory {ROOT_DIR} already exists at the target HDLFS.')
207+
print('Proceeding could result in overwriting files in this directory of the target instance.')
208+
user_input = input('Would you like to proceed? (Y/N): ')
209+
while user_input not in ['Y', 'N']:
210+
print('ERROR: Invalid response. Please enter Y or N.')
211+
user_input = input('Would you like to proceed? (Y/N): ')
212+
if user_input == 'N':
213+
quit()
214+
215+
# The start timestamp is declared
216+
print('\nStarting copy...')
217+
start = datetime.now()
218+
print('Start time:\t', start, '\n')
219+
220+
# Connecting to the source instance and requesting a list of the current source HDLFS directory at
221+
# the root directory provided
222+
# If connection is unsuccessful the http.client will raise an exception
223+
print('Connecting to source instance...')
224+
source_json_data = fetch(source_connection, 'GET', json_request_url,
225+
None, source_json_request_headers)
226+
source_files_dict = json.loads(source_json_data)
227+
print('Successfully connected to source instance.\n')
228+
229+
# Accessing the path suffix of each file in the root directory of the source instance
230+
source_files_paths = []
231+
for file in source_files_dict['DirectoryListing']['partialListing']['FileStatuses']['FileStatus']:
232+
source_files_paths.append(file['pathSuffix'])
233+
234+
# Starting with the starting index provided (or the first file if no starting index was provided),
235+
# the copy begins
236+
cur_index = STARTING_INDEX
237+
while cur_index < len(source_files_paths):
238+
try:
239+
file_path = source_files_paths[cur_index]
240+
request_path = f'/{ROOT_DIR}/{file_path}'
241+
offset = 0
242+
length = 10000000
243+
read_length = length
244+
merge_count = 0
245+
to_merge = {'sources': []}
246+
list_of_temp = []
247+
248+
# While each chunk of bytes read continues to be the length of bytes requested, indicating
249+
# that the EOF has not been reached, more bytes are read
250+
while read_length == length:
251+
source_request_url = f'/webhdfs/v1/{request_path}?op=OPEN&offset={offset}&length={length}'
252+
source_data = fetch(source_connection, 'GET', source_request_url,
253+
None, source_request_headers)
254+
read_length = len(source_data)
255+
256+
# If the first request returns less than 10MB, the entire file has been read and can be
257+
# written to a file under the same name in the target location, without creating any
258+
# temporary files
259+
if offset == 0 and read_length < length:
260+
target_request_url = f'/webhdfs/v1/{request_path}?op=CREATE&data=true'
261+
target_data = fetch(target_connection, 'PUT', target_request_url, source_data,
262+
target_request_headers)
263+
print(f'Created and wrote {read_length} bytes to {request_path}')
264+
265+
# Otherwise a temporary file is created for the current read and each subsequent read;
266+
# the files will later be merged
267+
else:
268+
merge_count += 1
269+
temp_path = request_path[:-8] + str(merge_count) + '.parquet'
270+
target_request_url = f'/webhdfs/v1/{temp_path}?op=CREATE&data=true'
271+
target_data = fetch(target_connection, 'PUT', target_request_url, source_data,
272+
target_request_headers)
273+
print(f'Created temporary file {temp_path}')
274+
list_of_temp.append(temp_path)
275+
temp_to_merge = {'path': temp_path}
276+
to_merge['sources'].append(temp_to_merge)
277+
offset += length
278+
279+
# If there are files to merge, they are merged here and delete the temporary files
280+
if merge_count != 0:
281+
cur_file_bytes_read = offset + read_length
282+
print(f'Read and wrote {cur_file_bytes_read} bytes to {merge_count}',
283+
f'temporary files for file {request_path}')
284+
285+
# Creating the file where we will merge all temporary files to
286+
target_request_url = f'/webhdfs/v1/{request_path}?op=CREATE&data=true'
287+
target_data = fetch(target_connection, 'PUT', target_request_url, None,
288+
target_request_headers)
289+
print(f'Created {request_path}')
290+
291+
# Merging the files to the merge destination file
292+
target_request_url = f'/webhdfs/v1/{request_path}?op=MERGE&data=true'
293+
target_data = fetch(target_connection, 'POST', target_request_url,
294+
json.dumps(to_merge), target_request_headers)
295+
print(f'Merged {merge_count} files to {request_path}')
296+
297+
# Deleting the temporary files after the merge is complete
298+
to_delete = {'files': to_merge['sources']}
299+
target_request_url = f'/webhdfs/v1/?op=DELETE_BATCH&data=true'
300+
target_data = fetch(target_connection, 'POST', target_request_url,
301+
json.dumps(to_delete), target_request_headers)
302+
print(f'Deleted {merge_count} temporary files')
303+
304+
# If an exception is raised, the error is printed and arguments are provided to rerun the
305+
# script beginning with the file in which the error occured
306+
except Exception as error:
307+
print(error)
308+
print('To rerun this script begining with the most recently accessed file, run:',
309+
f'\npy {sys.argv[0]} root=\'{ROOT_DIR}\' index={cur_index}')
310+
quit()
311+
# If any other error occurs, arguments are provided to rerun the script beginning with the file
312+
# in which the error occured
313+
except:
314+
print('To rerun this script begining with the most recently accessed file, run:',
315+
f'\npy {sys.argv[0]} root=\'{ROOT_DIR}\' index={cur_index}')
316+
quit()
317+
else:
318+
cur_index += 1
319+
320+
end = datetime.now()
321+
print(f'Successfully copied {ROOT_DIR} from index {STARTING_INDEX}',
322+
'from source instance to target instance.')
323+
print('End time:\t', end)
324+
print('Elapsed time:\t', end - start)
325+
```
326+
327+
### Run the copy script
328+
329+
1. To copy the directory `My_Directory` from the source to the target HDLFS, execute the following in command prompt.
330+
331+
```Terminal
332+
py copy_HDLFS.py root='My_Directory'
333+
```
334+
335+
2. Verify that `My_Directory` as well as its contents are now visible in the target container.
336+
337+
338+
### Knowledge check
339+
340+
Congratulations! You have now copied the directory `My_Directory` between HDLFS instances.
341+
342+
---
Loading
Loading

0 commit comments

Comments
 (0)