|
| 1 | +--- |
| 2 | +parser: v2 |
| 3 | +author_name: Allyson Sherwood |
| 4 | +author_profile: https://github.com/allysonsherwood |
| 5 | +auto_validation: true |
| 6 | +time: 15 |
| 7 | +tags: [ tutorial>beginner, software-product-function>sap-hana-cloud\,-data-lake, software-product>sap-hana-cloud, programming-tool>python] |
| 8 | +primary_tag: software-product-function>sap-hana-cloud\,-data-lake |
| 9 | +--- |
| 10 | + |
| 11 | +# Copy Data Between Data Lake Files Instances Using Python |
| 12 | +<!-- description --> Copy a root directory from a source SAP HANA data lake Files instance to a target SAP HANA data lake files instance. A Python script is provided to execute this copy which leverages the SAP HANA Cloud, data lake Files REST API. |
| 13 | + |
| 14 | +## Prerequisites |
| 15 | + - Two running non-trial SAP HANA data lake Files instances – a source and a target |
| 16 | + - Both instances added to SAP HANA database explorer; [instructions to add](data-lake-hdlfs-dbx-connection) |
| 17 | + - Read permissions on the source instance |
| 18 | + - Read and write permissions on the target instance |
| 19 | + - Client certificates set up on both instances, and a copy of the client certificate and client key files for each instance; [instructions to set up certificates](https://blogs.sap.com/2021/08/05/setting-up-initial-access-to-hana-cloud-data-lake-files/) |
| 20 | + - Python 3.10 or later; [download Python](https://www.python.org/downloads/) |
| 21 | + |
| 22 | +## You will learn |
| 23 | + - How to copy a directory from the source data lake file container to the target data lake file container |
| 24 | + |
| 25 | +## Intro |
| 26 | +Note that for simplicity, the same client certificate will be used for both the target and the source instance in this tutorial. |
| 27 | + |
| 28 | +--- |
| 29 | + |
| 30 | +### Set up a directory in the source data lake Files instance |
| 31 | + |
| 32 | +In this step, we will create a directory called `My_Directory` with two subdirectories, `Subdirectory_1` and `Subdirectory_2`, with a file in each subdirectory. The files can be of any format, however this tutorial will use text files. |
| 33 | + |
| 34 | +1. Create and save the following text files locally. |
| 35 | + |
| 36 | + ```File_1.txt |
| 37 | + This is my first text file. |
| 38 | + ``` |
| 39 | + Ensure to recall the location of your text files. |
| 40 | +
|
| 41 | + ```File_2.txt |
| 42 | + This is my second text file. |
| 43 | + ``` |
| 44 | +
|
| 45 | +2. In database explorer, upload these files to your source HDLFS instance. Upon uploading the file with the following relative path, the directory `My_Directory` with subdirectory `Subdirectory_1` will be created. |
| 46 | +
|
| 47 | +  |
| 48 | +
|
| 49 | +  |
| 50 | +
|
| 51 | +3. Upload `File_2.txt` using the relative path `My_Directory/Subdirectory_2`. |
| 52 | +
|
| 53 | +
|
| 54 | +### Set up a Python script |
| 55 | +
|
| 56 | +
|
| 57 | +1. In this step you will need to access the REST API endpoints for both your source and target HLDFS instances. The REST API endpoint for a given instance can be found by clicking the action menu in SAP HANA Cloud Central. |
| 58 | +
|
| 59 | +  |
| 60 | +
|
| 61 | +2. Create a Python script beginning with the code below and save the file as `copy_HDLFS.py`. Edit the source and target instance variables with the appropriate REST API endpoints for each of your containers. Edit the certificate variables with the appropriate path to the certificate and key used for both HDLFS containers. |
| 62 | +
|
| 63 | + ```Python |
| 64 | + # Source information |
| 65 | + SOURCE_FILES_REST_API_ENDPOINT = '<File Container REST API>' |
| 66 | +
|
| 67 | + # Target information |
| 68 | + TARGET_FILES_REST_API_ENDPOINT = '<File Container REST API>' |
| 69 | +
|
| 70 | + # Certificates information |
| 71 | + CERTIFICATE_PATH = '<Path to client.crt>' |
| 72 | + KEY_PATH = '<Path to client.key>' |
| 73 | + ``` |
| 74 | +
|
| 75 | +### Set up a copy script |
| 76 | +
|
| 77 | +1. Append the following code to the end of your `copy_HDLFS.py file` |
| 78 | +
|
| 79 | + ```Python |
| 80 | + # This script copies a directory including all subdirectories and files from a root directory |
| 81 | + # in the source HDLFS instance to the target HDLFS instance. |
| 82 | + |
| 83 | + # This script is run with the following arguments: |
| 84 | + # root='root_dir_name' |
| 85 | + # where root_dir_name is the name of the root directory in the source instance |
| 86 | + # that is being copied |
| 87 | + # index=i |
| 88 | + # (optional) where i is a non-negative integer and the index of the file in the source |
| 89 | + # instance that will be used as a starting point for the copy -- in other words, the first i |
| 90 | + # files will be skipped and thus will not be copied; if no value is given, the default index 0 |
| 91 | + # will be used -- all files will be copied |
| 92 | + # |
| 93 | + # Ex. The following arguments would execute a copy from root directory 'TPCH_SF100' starting at the |
| 94 | + # file at index 42 |
| 95 | + # py copy_HDLFS.py root='TPCH_SF100' index=42 |
| 96 | + |
| 97 | + ################################################################################################### |
| 98 | + |
| 99 | + # Importing dependencies |
| 100 | + import http.client |
| 101 | + import json |
| 102 | + from datetime import datetime |
| 103 | + import ssl |
| 104 | + import sys |
| 105 | + |
| 106 | + ################################################################################################### |
| 107 | + |
| 108 | + # Handling arguments if either have been provided |
| 109 | + # In either order, root and index can be specified by the user in the following format: |
| 110 | + # py copy_HDLFS.py root='TPCH_SF100' index=42 |
| 111 | + def assign_arguments(arg_list): |
| 112 | + args = {} |
| 113 | + if len(arg_list) <= 3: |
| 114 | + for arg in arg_list: |
| 115 | + if arg[:6] == 'root=\'' and arg[-1] == '\'' and not ('root' in args): |
| 116 | + args['root'] = arg[6:-1] |
| 117 | + elif arg[:6] == 'index=' and not ('index' in args): |
| 118 | + try: |
| 119 | + args['index'] = int(arg[6:]) |
| 120 | + args['index'] >= 0 |
| 121 | + except: |
| 122 | + raise Exception(f'ERROR: Invalid argument {arg}.') |
| 123 | + else: |
| 124 | + raise Exception(f'ERROR: Invalid argument {arg}.') |
| 125 | + else: |
| 126 | + raise Exception('ERROR: Too many arguments.') |
| 127 | + return args |
| 128 | + |
| 129 | + argument_assignment = assign_arguments(sys.argv[1:]) |
| 130 | + if 'root' in argument_assignment: |
| 131 | + ROOT_DIR = argument_assignment['root'] |
| 132 | + else: |
| 133 | + raise Exception('ERROR: No root directory was provided. To copy the entire source instance use root=\'\'.') |
| 134 | + if 'index' in argument_assignment: |
| 135 | + STARTING_INDEX = argument_assignment['index'] |
| 136 | + else: |
| 137 | + STARTING_INDEX = 0 |
| 138 | + |
| 139 | + ################################################################################################### |
| 140 | + |
| 141 | + # Creating an SSL context using the certificate path and key path variables |
| 142 | + ssl_context = ssl.create_default_context() |
| 143 | + ssl_context.load_cert_chain(CERTIFICATE_PATH, KEY_PATH) |
| 144 | + |
| 145 | + # Creating container variables for the source and target instances |
| 146 | + source_container = SOURCE_FILES_REST_API_ENDPOINT.partition('.')[0] |
| 147 | + target_container = TARGET_FILES_REST_API_ENDPOINT.partition('.')[0] |
| 148 | + |
| 149 | + # Creating connections to the source and target instances |
| 150 | + source_connection = http.client.HTTPSConnection( |
| 151 | + SOURCE_FILES_REST_API_ENDPOINT, port=443, timeout=30, context=ssl_context) |
| 152 | + target_connection = http.client.HTTPSConnection( |
| 153 | + TARGET_FILES_REST_API_ENDPOINT, port=443, timeout=30, context=ssl_context) |
| 154 | + |
| 155 | + # Creating JSON request variables needed to access the present source and target HDLFS directories |
| 156 | + # at the root directory provided |
| 157 | + json_request_path = f'/{ROOT_DIR}' |
| 158 | + json_request_url = f'/webhdfs/v1/{json_request_path}?op=LISTSTATUS_RECURSIVE' |
| 159 | + source_json_request_headers = { |
| 160 | + 'x-sap-filecontainer': source_container, |
| 161 | + 'Content-Type': 'application/json' |
| 162 | + } |
| 163 | + target_json_request_headers = { |
| 164 | + 'x-sap-filecontainer': target_container, |
| 165 | + 'Content-Type': 'application/json' |
| 166 | + } |
| 167 | + |
| 168 | + # Creating request headers for reading and writing binary data from the source and target HDLFS |
| 169 | + # directories |
| 170 | + source_request_headers = { |
| 171 | + 'x-sap-filecontainer': source_container, |
| 172 | + 'Content-Type': 'application/octet-stream' |
| 173 | + } |
| 174 | + target_request_headers = { |
| 175 | + 'x-sap-filecontainer': target_container, |
| 176 | + 'Content-Type': 'application/octet-stream' |
| 177 | + } |
| 178 | + |
| 179 | + # http.client connection requests are made and if the request is successful, the read data |
| 180 | + # is returned |
| 181 | + def fetch(fetch_connection, fetch_method, fetch_url, fetch_body, fetch_headers): |
| 182 | + fetch_connection.request( |
| 183 | + method = fetch_method, |
| 184 | + url = fetch_url, |
| 185 | + body = fetch_body, |
| 186 | + headers = fetch_headers) |
| 187 | + response = fetch_connection.getresponse() |
| 188 | + data = response.read() |
| 189 | + response.close() |
| 190 | + return data |
| 191 | + |
| 192 | + ################################################################################################### |
| 193 | + |
| 194 | + # Connecting to the target instance and requesting a list of the current target HDLFS directory at |
| 195 | + # the root directory provided |
| 196 | + # If connection is unsuccessful the http.client will raise an exception |
| 197 | + print('\nConnecting to target instance...') |
| 198 | + target_json_data = fetch(target_connection, 'GET', json_request_url, |
| 199 | + None, target_json_request_headers) |
| 200 | + target_files_dict = json.loads(target_json_data) |
| 201 | + print('Successfully connected to target instance.\n') |
| 202 | + |
| 203 | + # If the root directory already exists in the target instance, the user is prompted to confirm that |
| 204 | + # they would like to proceed |
| 205 | + if 'DirectoryListing' in target_files_dict: |
| 206 | + print(f'WARNING: The directory {ROOT_DIR} already exists at the target HDLFS.') |
| 207 | + print('Proceeding could result in overwriting files in this directory of the target instance.') |
| 208 | + user_input = input('Would you like to proceed? (Y/N): ') |
| 209 | + while user_input not in ['Y', 'N']: |
| 210 | + print('ERROR: Invalid response. Please enter Y or N.') |
| 211 | + user_input = input('Would you like to proceed? (Y/N): ') |
| 212 | + if user_input == 'N': |
| 213 | + quit() |
| 214 | + |
| 215 | + # The start timestamp is declared |
| 216 | + print('\nStarting copy...') |
| 217 | + start = datetime.now() |
| 218 | + print('Start time:\t', start, '\n') |
| 219 | + |
| 220 | + # Connecting to the source instance and requesting a list of the current source HDLFS directory at |
| 221 | + # the root directory provided |
| 222 | + # If connection is unsuccessful the http.client will raise an exception |
| 223 | + print('Connecting to source instance...') |
| 224 | + source_json_data = fetch(source_connection, 'GET', json_request_url, |
| 225 | + None, source_json_request_headers) |
| 226 | + source_files_dict = json.loads(source_json_data) |
| 227 | + print('Successfully connected to source instance.\n') |
| 228 | + |
| 229 | + # Accessing the path suffix of each file in the root directory of the source instance |
| 230 | + source_files_paths = [] |
| 231 | + for file in source_files_dict['DirectoryListing']['partialListing']['FileStatuses']['FileStatus']: |
| 232 | + source_files_paths.append(file['pathSuffix']) |
| 233 | + |
| 234 | + # Starting with the starting index provided (or the first file if no starting index was provided), |
| 235 | + # the copy begins |
| 236 | + cur_index = STARTING_INDEX |
| 237 | + while cur_index < len(source_files_paths): |
| 238 | + try: |
| 239 | + file_path = source_files_paths[cur_index] |
| 240 | + request_path = f'/{ROOT_DIR}/{file_path}' |
| 241 | + offset = 0 |
| 242 | + length = 10000000 |
| 243 | + read_length = length |
| 244 | + merge_count = 0 |
| 245 | + to_merge = {'sources': []} |
| 246 | + list_of_temp = [] |
| 247 | + |
| 248 | + # While each chunk of bytes read continues to be the length of bytes requested, indicating |
| 249 | + # that the EOF has not been reached, more bytes are read |
| 250 | + while read_length == length: |
| 251 | + source_request_url = f'/webhdfs/v1/{request_path}?op=OPEN&offset={offset}&length={length}' |
| 252 | + source_data = fetch(source_connection, 'GET', source_request_url, |
| 253 | + None, source_request_headers) |
| 254 | + read_length = len(source_data) |
| 255 | + |
| 256 | + # If the first request returns less than 10MB, the entire file has been read and can be |
| 257 | + # written to a file under the same name in the target location, without creating any |
| 258 | + # temporary files |
| 259 | + if offset == 0 and read_length < length: |
| 260 | + target_request_url = f'/webhdfs/v1/{request_path}?op=CREATE&data=true' |
| 261 | + target_data = fetch(target_connection, 'PUT', target_request_url, source_data, |
| 262 | + target_request_headers) |
| 263 | + print(f'Created and wrote {read_length} bytes to {request_path}') |
| 264 | + |
| 265 | + # Otherwise a temporary file is created for the current read and each subsequent read; |
| 266 | + # the files will later be merged |
| 267 | + else: |
| 268 | + merge_count += 1 |
| 269 | + temp_path = request_path[:-8] + str(merge_count) + '.parquet' |
| 270 | + target_request_url = f'/webhdfs/v1/{temp_path}?op=CREATE&data=true' |
| 271 | + target_data = fetch(target_connection, 'PUT', target_request_url, source_data, |
| 272 | + target_request_headers) |
| 273 | + print(f'Created temporary file {temp_path}') |
| 274 | + list_of_temp.append(temp_path) |
| 275 | + temp_to_merge = {'path': temp_path} |
| 276 | + to_merge['sources'].append(temp_to_merge) |
| 277 | + offset += length |
| 278 | + |
| 279 | + # If there are files to merge, they are merged here and delete the temporary files |
| 280 | + if merge_count != 0: |
| 281 | + cur_file_bytes_read = offset + read_length |
| 282 | + print(f'Read and wrote {cur_file_bytes_read} bytes to {merge_count}', |
| 283 | + f'temporary files for file {request_path}') |
| 284 | + |
| 285 | + # Creating the file where we will merge all temporary files to |
| 286 | + target_request_url = f'/webhdfs/v1/{request_path}?op=CREATE&data=true' |
| 287 | + target_data = fetch(target_connection, 'PUT', target_request_url, None, |
| 288 | + target_request_headers) |
| 289 | + print(f'Created {request_path}') |
| 290 | + |
| 291 | + # Merging the files to the merge destination file |
| 292 | + target_request_url = f'/webhdfs/v1/{request_path}?op=MERGE&data=true' |
| 293 | + target_data = fetch(target_connection, 'POST', target_request_url, |
| 294 | + json.dumps(to_merge), target_request_headers) |
| 295 | + print(f'Merged {merge_count} files to {request_path}') |
| 296 | + |
| 297 | + # Deleting the temporary files after the merge is complete |
| 298 | + to_delete = {'files': to_merge['sources']} |
| 299 | + target_request_url = f'/webhdfs/v1/?op=DELETE_BATCH&data=true' |
| 300 | + target_data = fetch(target_connection, 'POST', target_request_url, |
| 301 | + json.dumps(to_delete), target_request_headers) |
| 302 | + print(f'Deleted {merge_count} temporary files') |
| 303 | + |
| 304 | + # If an exception is raised, the error is printed and arguments are provided to rerun the |
| 305 | + # script beginning with the file in which the error occured |
| 306 | + except Exception as error: |
| 307 | + print(error) |
| 308 | + print('To rerun this script begining with the most recently accessed file, run:', |
| 309 | + f'\npy {sys.argv[0]} root=\'{ROOT_DIR}\' index={cur_index}') |
| 310 | + quit() |
| 311 | + # If any other error occurs, arguments are provided to rerun the script beginning with the file |
| 312 | + # in which the error occured |
| 313 | + except: |
| 314 | + print('To rerun this script begining with the most recently accessed file, run:', |
| 315 | + f'\npy {sys.argv[0]} root=\'{ROOT_DIR}\' index={cur_index}') |
| 316 | + quit() |
| 317 | + else: |
| 318 | + cur_index += 1 |
| 319 | + |
| 320 | + end = datetime.now() |
| 321 | + print(f'Successfully copied {ROOT_DIR} from index {STARTING_INDEX}', |
| 322 | + 'from source instance to target instance.') |
| 323 | + print('End time:\t', end) |
| 324 | + print('Elapsed time:\t', end - start) |
| 325 | + ``` |
| 326 | +
|
| 327 | +### Run the copy script |
| 328 | +
|
| 329 | +1. To copy the directory `My_Directory` from the source to the target HDLFS, execute the following in command prompt. |
| 330 | +
|
| 331 | + ```Terminal |
| 332 | + py copy_HDLFS.py root='My_Directory' |
| 333 | + ``` |
| 334 | +
|
| 335 | +2. Verify that `My_Directory` as well as its contents are now visible in the target container. |
| 336 | +
|
| 337 | +
|
| 338 | +### Knowledge check |
| 339 | +
|
| 340 | +Congratulations! You have now copied the directory `My_Directory` between HDLFS instances. |
| 341 | +
|
| 342 | +--- |
0 commit comments