Skip to content

Add DevRev httpClient. Gracefully handle failed attachments. #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Release Notes

### v1.1.6

- Add exponential retry and handle rate-limiting towards DevRev.
- Gracefully handle failure to upload extracted attachments.

### v1.1.5

- Increase `delayFactor` and number of retries for the exponential backoff retry mechanism for HTTP requests.
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@devrev/ts-adaas",
"version": "1.1.5",
"description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.",
"version": "1.1.6",
"description": "DevRev ADaaS (AirDrop-as-a-Service) Typescript SDK.",
"type": "commonjs",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
Expand Down
2 changes: 1 addition & 1 deletion src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const ALLOWED_EVENT_TYPES = [
];

export const ARTIFACT_BATCH_SIZE = 2000;
export const MAX_DEVREV_ARTIFACT_SIZE = 536870912; // 512MB
export const MAX_DEVREV_ARTIFACT_SIZE = 262144000; // 250MB

export const AIRDROP_DEFAULT_ITEM_TYPES = {
EXTERNAL_DOMAIN_METADATA: 'external_domain_metadata',
Expand Down
16 changes: 10 additions & 6 deletions src/deprecated/uploader/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import axios from 'axios';
import { axiosDevRevClient } from '../../http/axios-devrev-client';
import { betaSDK, client } from '@devrev/typescript-sdk';
import fs, { promises as fsPromises } from 'fs';
import { createFormData } from '../common/helpers';
Expand Down Expand Up @@ -108,11 +108,15 @@ export class Uploader {
): Promise<any | null> {
const formData = createFormData(preparedArtifact, fetchedObjects);
try {
const response = await axios.post(preparedArtifact.url, formData, {
headers: {
'Content-Type': 'multipart/form',
},
});
const response = await axiosDevRevClient.post(
preparedArtifact.url,
formData,
{
headers: {
'Content-Type': 'multipart/form',
},
}
);

return response;
} catch (error) {
Expand Down
34 changes: 34 additions & 0 deletions src/http/axios-devrev-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import axios, { AxiosError } from 'axios';
import axiosRetry from 'axios-retry';

const axiosDevRevClient = axios.create();

axiosRetry(axiosDevRevClient, {
retries: 5,
retryDelay: (retryCount, error) => {
console.warn(
'Retry attempt: ' + retryCount + 'to url: ' + error.config?.url + '.'
);
if (error.response) {
const retry_after = error.response?.headers['retry-after'];
if (retry_after) {
return retry_after;
}
}
// Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms
return axiosRetry.exponentialDelay(retryCount, error, 1000);
},
retryCondition: (error: AxiosError) => {
return (
axiosRetry.isNetworkOrIdempotentRequestError(error) ||
error.response?.status === 429
);
},
onMaxRetryTimesExceeded(error: AxiosError, retryCount) {
console.log(`Max retries attempted: ${retryCount}`);
delete error.config?.headers.Authorization;
delete error.request._header;
},
});

export { axios, axiosDevRevClient };
2 changes: 1 addition & 1 deletion src/repo/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class Repo {
// Add the new records to the items array
this.items.push(...recordsToPush);

console.log(
console.info(
`Extracted ${recordsToPush.length} new items of type ${this.itemType}. Total number of items in repo: ${this.items.length}.`
);

Expand Down
45 changes: 29 additions & 16 deletions src/uploader/uploader.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { AxiosResponse } from 'axios';
import fs, { promises as fsPromises } from 'fs';
import { axios, axiosClient } from '../http/axios-client';
import { axios, axiosDevRevClient } from '../http/axios-devrev-client';
import zlib from 'zlib';
import { jsonl } from 'js-jsonl';
import FormData from 'form-data';
Expand All @@ -16,6 +15,7 @@ import {
UploaderFactoryInterface,
} from './uploader.interfaces';
import { serializeAxiosError } from '../logger/logger';
import { AxiosResponse } from 'axios';

export class Uploader {
private event: AirdropEvent;
Expand Down Expand Up @@ -123,11 +123,15 @@ export class Uploader {
formData.append('file', file);

try {
const response = await axiosClient.post(preparedArtifact.url, formData, {
headers: {
...formData.getHeaders(),
},
});
const response = await axiosDevRevClient.post(
preparedArtifact.url,
formData,
{
headers: {
...formData.getHeaders(),
},
}
);

return response;
} catch (error) {
Expand All @@ -154,15 +158,24 @@ export class Uploader {

formData.append('file', fileStreamResponse.data);

if (
fileStreamResponse.headers['content-length'] > MAX_DEVREV_ARTIFACT_SIZE
) {
return;
}
try {
const response = await axiosClient.post(preparedArtifact.url, formData, {
headers: {
...formData.getHeaders(),
...(!fileStreamResponse.headers['content-length'] && {
'Content-Length': MAX_DEVREV_ARTIFACT_SIZE,
}),
},
});
const response = await axiosDevRevClient.post(
preparedArtifact.url,
formData,
{
headers: {
...formData.getHeaders(),
...(!fileStreamResponse.headers['content-length'] && {
'Content-Length': MAX_DEVREV_ARTIFACT_SIZE,
}),
},
}
);
return response;
} catch (error) {
if (axios.isAxiosError(error)) {
Expand Down Expand Up @@ -239,7 +252,7 @@ export class Uploader {

private async downloadArtifact(artifactUrl: string): Promise<Buffer | void> {
try {
const response = await axiosClient.get(artifactUrl, {
const response = await axiosDevRevClient.get(artifactUrl, {
responseType: 'arraybuffer',
});

Expand Down
30 changes: 19 additions & 11 deletions src/workers/worker-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { axios } from '../http/axios-client';
import { axios } from '../http/axios-devrev-client';
import {
AirdropEvent,
ExtractorEventType,
Expand Down Expand Up @@ -650,10 +650,7 @@ export class WorkerAdapter<ConnectorState> {
if (error) {
return { error };
}
console.log(
'this.state.toDevRev?.attachmentsMetadata :>> ',
this.state.toDevRev?.attachmentsMetadata
);

if (attachments) {
const attachmentsToProcess = attachments.slice(
this.state.toDevRev?.attachmentsMetadata?.lastProcessed,
Expand Down Expand Up @@ -683,9 +680,15 @@ export class WorkerAdapter<ConnectorState> {
fileType
);
if (!preparedArtifact) {
return {
error: { message: 'Error while preparing artifact.' },
};
console.warn(
'Error while preparing artifact for attachment ID ' +
attachment.id +
'. Skipping attachment'
);
if (this.state.toDevRev) {
this.state.toDevRev.attachmentsMetadata.lastProcessed++;
}
continue;
}

const uploadedArtifact = await this.uploader.streamToArtifact(
Expand All @@ -694,9 +697,14 @@ export class WorkerAdapter<ConnectorState> {
);

if (!uploadedArtifact) {
return {
error: { message: 'Error while streaming artifact.' },
};
console.warn(
'Error while preparing artifact for attachment ID ' +
attachment.id
);
if (this.state.toDevRev) {
this.state.toDevRev.attachmentsMetadata.lastProcessed++;
}
continue;
}

const ssorAttachment: SsorAttachment = {
Expand Down
Loading