-
Notifications
You must be signed in to change notification settings - Fork 3
Update attachments extraction phase #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
patricijabrecko
merged 4 commits into
main
from
patricijabrecko/update-airdrop-attachments-extraction
May 19, 2025
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,94 +1,123 @@ | ||
In the attachment extraction phase, the snap-in has to upload each attachment to DevRev and associate it with its parent data object. | ||
During the attachments extraction phase, the snap-in retrieves attachments from the external system and uploads them to DevRev. This phase occurs after data extraction, transformation, and loading are completed. | ||
|
||
## Triggering event | ||
|
||
Airdrop initiates the attachment extraction by starting the snap-in with a message with an event of | ||
type `EXTRACTION_ATTACHMENTS_START`. | ||
This is done after the data extraction, transformation, and loading into DevRev are completed. | ||
```mermaid | ||
sequenceDiagram | ||
participant Airdrop | ||
participant Snap-in | ||
participant ExternalSystem | ||
|
||
Airdrop->>Snap-in: EXTRACTION_ATTACHMENTS_START | ||
|
||
alt Success path | ||
Snap-in->>ExternalSystem: Request attachments | ||
ExternalSystem->>Snap-in: Return attachments | ||
Snap-in->>Airdrop: EXTRACTION_ATTACHMENTS_DONE | ||
|
||
else Runtime limit reached | ||
Snap-in->>Airdrop: EXTRACTION_ATTACHMENTS_PROGRESS | ||
Airdrop->>Snap-in: EXTRACTION_ATTACHMENTS_CONTINUE | ||
Note over Snap-in,Airdrop: Process continues where it left off | ||
|
||
else Rate limiting required | ||
Snap-in->>Airdrop: EXTRACTION_ATTACHMENTS_DELAY (with back-off time) | ||
Note over Airdrop: Waits for specified time | ||
Airdrop->>Snap-in: EXTRACTION_ATTACHMENTS_CONTINUE | ||
Note over Snap-in,Airdrop: Process resumes after delay | ||
|
||
else Error occurs | ||
Snap-in->>Airdrop: EXTRACTION_ATTACHMENTS_ERROR | ||
Note over Snap-in,Airdrop: Process terminates | ||
end | ||
``` | ||
|
||
During the attachment extraction phase, | ||
the snap-in extracts attachments from the external system and uploads them as artifacts to DevRev. | ||
### Event types | ||
|
||
The snap-in must respond to Airdrop with a message with an event of type | ||
`EXTRACTION_ATTACHMENTS_PROGRESS` together with an optional progress estimate | ||
when the maximum snap-in runtime (13 minutes) has been reached. | ||
| Event | Direction | Description | | ||
|-------|-----------|-------------| | ||
| `EXTRACTION_ATTACHMENTS_START` | Airdrop → Snap-in | Initiates the attachments extraction | | ||
| `EXTRACTION_ATTACHMENTS_PROGRESS` | Snap-in → Airdrop | Indicates process is ongoing but runtime limit (13 minutes) reached | | ||
| `EXTRACTION_ATTACHMENTS_DELAY` | Snap-in → Airdrop | Requests a delay due to rate limiting from external system | | ||
patricijabrecko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| `EXTRACTION_ATTACHMENTS_CONTINUE` | Airdrop → Snap-in | Resumes the extraction process after progress update or delay | | ||
patricijabrecko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| `EXTRACTION_ATTACHMENTS_DONE` | Snap-in → Airdrop | Signals successful completion of attachments extraction | | ||
patricijabrecko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| `EXTRACTION_ATTACHMENTS_ERROR` | Snap-in → Airdrop | Indicates that an error occurred during extraction | | ||
patricijabrecko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
patricijabrecko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
The snap-in must respond to Airdrop with a message with an event of type `EXTRACTION_ATTACHMENTS_DELAY` | ||
and specify a back-off time when the extraction has been rate-limited by the external system and | ||
back-off is required. | ||
## Implementation | ||
|
||
In both cases, Airdrop starts the snap-in with a message with an event of type | ||
`EXTRACTION_ATTACHMENTS_CONTINUE`. | ||
The restart is immediate in case of `EXTRACTION_ATTACHMENTS_PROGRESS`, or delayed in case of | ||
`EXTRACTION_ATTACHMENTS_DELAY`. | ||
### Default implementation | ||
|
||
Once the attachment extraction phase is done, the snap-in must respond to Airdrop with a message | ||
with an event of type `EXTRACTION_ATTACHMENTS_DONE`. | ||
The SDK provides a default implementation for attachments extraction. If the default behavior (iterating through attachment metadata and uploading from saved URLs) meets your needs, **no additional implementation is required**. | ||
|
||
If attachment extraction fails the snap-in must respond to Airdrop with a message with an event of | ||
type `EXTRACTION_ATTACHMENTS_ERROR`. | ||
### Custom implementation | ||
|
||
## Implementation | ||
If you need to customize the attachments extraction, modify the implementation in `attachments-extraction.ts`. | ||
Use the `streamAttachments` function from the `WorkerAdapter` class, which handles most of functionality needed for this phase: | ||
|
||
Attachments extraction is already provided by SDK, but if you need to customize it for your use case, | ||
it should be implemented in the [attachments-extraction.ts](https://github.com/devrev/airdrop-template/blob/main/code/src/functions/extraction/workers/attachments-extraction.ts) file. | ||
|
||
After uploading an attachment or a batch of attachments, the extractor also has to prepare and | ||
upload a file specifying the extracted and uploaded attachments. | ||
```typescript | ||
const response = await adapter.streamAttachments({ | ||
stream: getFileStream, | ||
batchSize: 10 | ||
}); | ||
``` | ||
|
||
It should contain the DevRev IDs of the extracted and uploaded attachments, along with the parent | ||
domain object ID from the external system and the actor ID from the external system. | ||
Parameters: | ||
- `stream`: (Required) Function that handles downloading attachments from the external system | ||
- `batchSize`: (Optional) Number of attachments to process simultaneously (default: 1) | ||
|
||
Increasing the batch size (from the default 1) can significantly improve performance. But be mindful of lambda memory constraints and external system rate limits when choosing batch size. A batch size between 10 and 50 typically provides good results. | ||
|
||
```typescript Example 'stream' function | ||
async function getFileStream({ | ||
item, | ||
}: ExternalSystemAttachmentStreamingParams): Promise<ExternalSystemAttachmentStreamingResponse> { | ||
const { id, url } = item; | ||
|
||
try { | ||
const fileStreamResponse = await axiosClient.get(url, { | ||
responseType: 'stream', | ||
headers: { | ||
'Accept-Encoding': 'identity', | ||
}, | ||
}); | ||
|
||
return { httpStream: fileStreamResponse }; | ||
} catch (error) { | ||
if (axios.isAxiosError(error)) { | ||
console.warn(`Error while fetching attachment ${id} from URL.`, serializeAxiosError(error)); | ||
console.warn('Failed attachment metadata', item); | ||
} else { | ||
console.warn(`Error while fetching attachment ${id} from URL.`, error); | ||
console.warn('Failed attachment metadata', item); | ||
} | ||
|
||
return { | ||
error: { | ||
message: `Failed to fetch attachment ${id} from URL.`, | ||
}, | ||
}; | ||
} | ||
} | ||
``` | ||
|
||
The uploaded artifact is structured like a normal artifact containing extracted data in JSON Lines | ||
(JSONL) format and requires specifying `ssor_attachment` as the item type. | ||
## Emitting responses | ||
|
||
The snap-in must respond to Airdrop with a message, that either signals success, a delay, or an error. | ||
The snap-in must send exactly one response to Airdrop when extraction is complete: | ||
|
||
```typescript Success | ||
```typescript Success response | ||
await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone); | ||
``` | ||
|
||
```typescript Delay | ||
```typescript Delay response (for rate limiting) | ||
await adapter.emit(ExtractorEventType.ExtractionAttachmentsDelay, { | ||
delay: "30", | ||
delay: "30", // Delay in seconds | ||
}); | ||
``` | ||
|
||
```typescript Error | ||
```typescript Error response | ||
await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, { | ||
error: "Informative error message", | ||
}); | ||
``` | ||
|
||
<Note>The snap-in must always emit a single message.</Note> | ||
|
||
## Examples | ||
|
||
Here is an example of an SSOR attachment file: | ||
|
||
```json lines | ||
{ | ||
"id": { | ||
"devrev": "don:core:dvrv-us-1:devo/1:artifact/1", // DON of the artifact, that S3interact returned | ||
"external": "111" // ID of the artifact in the external service | ||
}, | ||
"parent_id": { | ||
"external": "1111" // ID of the parent object in the external service | ||
}, | ||
"actor_id": { | ||
"external": "11111" // ID of the actor that uploaded/modified the artifact in the external service | ||
} | ||
} | ||
{ | ||
"id": { | ||
"devrev": "don:core:dvrv-us-1:devo/1:artifact/2", | ||
"external": "222" | ||
}, | ||
"parent_id": { | ||
"external": "2222" | ||
}, | ||
"actor_id": { | ||
"external": "22222" | ||
} | ||
} | ||
``` | ||
<Note>The snap-in must always emit exactly one response event.</Note> |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.