Skip to content

Commit 6d98339

Browse files
joe-plempira
authored andcommitted
chore: more demonstrative examples
1 parent 49e5f23 commit 6d98339

File tree

7 files changed

+383
-0
lines changed

7 files changed

+383
-0
lines changed

examples/governance/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Subscriber Example: Governance
2+
3+
This example demonstrates how to use the `AlgorandSubscriber` to parse governance commitment transactions. Every 10 seconds, the subscriber will print out all of the governance commitments made since the last sync. The subscriber in this example uses `"sync_behaviour": "catchup-with-indexer"` to catch up because we are expecting to have a large amount of transactions with a common note prefix. This is an example of where the indexer's server-side filtering is useful. It should be noted that the exact same behavior can be achieved without indexer using algod, but indexer allows for a quicker catchup with fewer API calls. This example also only polls the chain every 10 seconds, since it is primarily useful for historical data and we don't care about live data.
4+
5+
## Governance Prefix
6+
7+
This example uses the `af/gov1` governance prefix to find governance transactions. For more information on Algorand Governance transactions, see the [Govenor's Guide](https://forum.algorand.org/t/governors-guide-2021-2024/12013).
8+
9+
## Running the example
10+
11+
To run the example, execute the following commands:
12+
13+
### Install dependencies
14+
15+
```bash
16+
poetry install
17+
```
18+
19+
### Run the script
20+
21+
```bash
22+
poetry run python governance.py
23+
```

examples/governance/governance.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import base64
2+
import json
3+
import random
4+
5+
from algokit_subscriber.subscriber import AlgorandSubscriber
6+
from algokit_subscriber.types.subscription import SubscribedTransaction
7+
from algokit_utils.beta.algorand_client import AlgorandClient
8+
from algokit_utils.beta.composer import PayParams
9+
10+
algorand = AlgorandClient.default_local_net()
11+
12+
13+
dispenser = algorand.account.localnet_dispenser()
14+
sender = algorand.account.random()
15+
16+
# Fund the sender
17+
algorand.send.payment(
18+
PayParams(sender=dispenser.address, receiver=sender.address, amount=1_000_000)
19+
)
20+
21+
# Send a governance commitment message
22+
algorand.send.payment(
23+
PayParams(
24+
sender=sender.address,
25+
receiver=sender.address,
26+
amount=0,
27+
# Commit a random amount of ALGO
28+
note=f'af/gov1:j{{"com":{random.randint(1_000_000, 100_000_000)}}}'.encode(),
29+
)
30+
)
31+
32+
# Send an unrelated message
33+
algorand.send.payment(
34+
PayParams(
35+
sender=sender.address,
36+
receiver=sender.address,
37+
amount=0,
38+
note=b"Some random txn",
39+
)
40+
)
41+
42+
# Every subscriber instance uses a water to track what block it processed last.
43+
# In this example we are using a variable to track the watermark
44+
45+
watermark = 0
46+
47+
48+
# To implement a watermark in the subscriber, we must define a get and set function
49+
def get_watermark() -> int:
50+
"""
51+
Get the current watermark value
52+
"""
53+
return watermark
54+
55+
56+
def set_watermark(new_watermark: int) -> None:
57+
"""
58+
Set our watermark variable to the new watermark from the subscriber
59+
"""
60+
global watermark # noqa: PLW0603
61+
watermark = new_watermark
62+
63+
64+
subscriber = AlgorandSubscriber(
65+
algod_client=algorand.client.algod,
66+
indexer_client=algorand.client.indexer,
67+
config={
68+
"filters": [
69+
{
70+
"name": "Governance",
71+
# Only match non-zero USDC transfers
72+
"filter": {
73+
"type": "pay",
74+
"note_prefix": "af/gov1:j",
75+
},
76+
},
77+
],
78+
# Instead of always waiting for the next block, just poll for new blocks every 10 seconds
79+
"wait_for_block_when_at_tip": False,
80+
"frequency_in_seconds": 10,
81+
# The watermark persistence functions are used to get and set the watermark
82+
"watermark_persistence": {"get": get_watermark, "set": set_watermark},
83+
# Indexer has the ability to filter transactions server-side, resulting in less API calls
84+
# This is only useful if we have a very specific query, such as a note prefix
85+
"sync_behaviour": "catchup-with-indexer",
86+
},
87+
)
88+
89+
90+
def print_transfer(transaction: SubscribedTransaction, _: str) -> None:
91+
"""
92+
This is an EventListener callback. We use the .on function below to attach this callback to specific events.
93+
94+
Every EventListener callback will receive two arguments:
95+
* The transaction data
96+
* The filter name (from the 'filters' list) that the transaction matched
97+
"""
98+
json_data = (
99+
base64.b64decode(transaction["note"])
100+
.decode()
101+
.split(":j")[1]
102+
.replace("”", '"')
103+
.replace("“", '"')
104+
)
105+
106+
amount = json.loads(json_data)["com"] * 1e-6
107+
108+
print(
109+
f"Transaction {transaction['sender']} committed {amount} ALGO on round {transaction['confirmed-round']} in transaction {transaction['id']}"
110+
)
111+
112+
113+
# Attach the callback to the events we are interested in
114+
subscriber.on("Governance", print_transfer)
115+
116+
# Start the subscriber
117+
subscriber.start()

examples/live_monitoring/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Subscriber Example: Live Monitoring
2+
3+
This example demonstrates how to use the `AlgorandSubscriber` to get live transactions from the Algorand blockchain.
4+
5+
Each round, the subscriber will print out all of the USDC and ALGO transactions that have ocurred.
6+
7+
This is an example of using the subscriber for live monitoring where you don't care about historical data. This behavior is primarily driven by the `"sync_behaviour": "skip-sync-newest"` configuration which skips syncing older blocks. Since we don't care about historical data, the watermark of the last round processed is not persisted and only a non-archival algod is required for the subscriber to function. This makes this setup lightweight with low infrastructure requirements.
8+
9+
## Running the example
10+
11+
To run the example, execute the following commands:
12+
13+
### Install dependencies
14+
15+
```bash
16+
poetry install
17+
```
18+
19+
### Run the script
20+
21+
```bash
22+
poetry run python live_monitoring.py
23+
```
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from algokit_subscriber.subscriber import AlgorandSubscriber
2+
from algokit_subscriber.types.subscription import SubscribedTransaction
3+
from algokit_utils.beta.algorand_client import AlgorandClient
4+
5+
algorand = AlgorandClient.main_net()
6+
7+
# Every subscriber instance uses a water to track what block it processed last.
8+
# In this example we are using a variable to track the watermark
9+
10+
watermark = 0
11+
12+
13+
# To implement a watermark in the subscriber, we must define a get and set function
14+
def get_watermark() -> int:
15+
"""
16+
Get the current watermark value
17+
"""
18+
return watermark
19+
20+
21+
def set_watermark(new_watermark: int) -> None:
22+
"""
23+
Set our watermark variable to the new watermark from the subscriber
24+
"""
25+
global watermark # noqa: PLW0603
26+
watermark = new_watermark
27+
28+
29+
subscriber = AlgorandSubscriber(
30+
algod_client=algorand.client.algod,
31+
config={
32+
"filters": [
33+
{
34+
"name": "USDC",
35+
# Only match non-zero USDC transfers
36+
"filter": {
37+
"type": "axfer",
38+
"asset_id": 31566704, # mainnet usdc
39+
"min_amount": 1,
40+
},
41+
},
42+
{
43+
"name": "ALGO",
44+
# Only match non-zero ALGO transfers
45+
"filter": {
46+
"type": "pay",
47+
"min_amount": 1,
48+
},
49+
},
50+
],
51+
# Once we are caught up, always wait until the next block is available and process it immediately once available
52+
"wait_for_block_when_at_tip": True,
53+
# The watermark persistence functions are used to get and set the watermark
54+
"watermark_persistence": {"get": get_watermark, "set": set_watermark},
55+
# Skip the sync process and immediately get the latest block in the network
56+
"sync_behaviour": "skip-sync-newest",
57+
# Max rounds to sync defines how many rounds to lookback when first starting the subscriber
58+
# If syncing via a non-archival node, this could be up to 1000 rounds back
59+
# In this example we want to immediately start processing the latest block without looking back
60+
"max_rounds_to_sync": 1,
61+
},
62+
)
63+
64+
65+
def print_transfer(transaction: SubscribedTransaction, filter_name: str) -> None:
66+
"""
67+
This is an EventListener callback. We use the .on function below to attach this callback to specific events.
68+
69+
Every EventListener callback will receive two arguments:
70+
* The transaction data
71+
* The filter name (from the 'filters' list) that the transaction matched
72+
"""
73+
if filter_name == "USDC":
74+
details = transaction["asset-transfer-transaction"]
75+
elif filter_name == "ALGO":
76+
details = transaction["payment-transaction"]
77+
78+
print(
79+
f"{transaction['sender']} sent {details['receiver']} {details['amount'] * 1e-6} {filter_name} in transaction {transaction['id']}"
80+
)
81+
82+
83+
# Attach the callback to the events we are interested in
84+
subscriber.on("ALGO", print_transfer)
85+
subscriber.on("USDC", print_transfer)
86+
87+
# Start the subscriber
88+
subscriber.start()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
transactions.csv
2+
watermark
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Subscriber Example: Transaction Record
2+
3+
This example demonstrates how to use the `AlgorandSubscriber` to record all of the balance changes for a given account. This example uses the filesystem to persist the watermark and record transactions in a CSV file. This example makes use of the `"balance_changes"` filter option which will include ANY transaction that affects the balance of the given account. This example uses `"sync_behaviour": "sync-oldest"` to ensure that we get all historical data from an archival node. An indexer could be used for catchup, but due to the complex nature of the query it would not save any API calls like it would with a more simple query (such as the one in the [governance example](../governance/README.md)).
4+
5+
## Created Files
6+
7+
`watermark` will be created with the last processed round and updated with each new round processed.
8+
9+
`transactions.csv` will be created with the header `round,sender,receiver,amount` and will append a new row for each transaction processed.
10+
11+
## Running the example
12+
13+
To run the example, execute the following commands:
14+
15+
### Install dependencies
16+
17+
```bash
18+
poetry install
19+
```
20+
21+
### Run the script
22+
23+
```bash
24+
poetry run python governance.py
25+
```
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from pathlib import Path
2+
3+
from algokit_subscriber.subscriber import AlgorandSubscriber
4+
from algokit_subscriber.types.subscription import SubscribedTransaction
5+
from algokit_utils.beta.algorand_client import AlgorandClient
6+
7+
algorand = AlgorandClient.main_net()
8+
9+
# The desired address to track
10+
track_address = "PDS6KDTDQBBIL34FZSZWL3CEO454VKAMGWRPPP2D52W52WJW2OBDUQJRZM"
11+
12+
# The list of balance changes to write to the CSV file
13+
balance_changes = []
14+
15+
# The directory of this file
16+
this_dir = Path(__file__).parent
17+
18+
watermark_file = this_dir / "watermark"
19+
20+
# If watermark file doesn't exist, create it with a value of 0
21+
if not watermark_file.exists():
22+
watermark_file.write_text("43932536")
23+
24+
# The CSV file we will write all balance changes of our tracked address to
25+
csv_file = this_dir / "transactions.csv"
26+
if not csv_file.exists():
27+
csv_file.write_text("round,txn_id,asset_id,amount\n")
28+
29+
30+
def get_watermark() -> int:
31+
"""
32+
The get_watermark tells the subscriber what the last processed block is
33+
We are using the filesystem to ensure the value is persisted in the event of failures, power outages, etc.
34+
"""
35+
return int(watermark_file.read_text())
36+
37+
38+
def set_watermark(new_watermark: int) -> None:
39+
"""
40+
Write the new transactions to the CSV file and then write the new watermark value to the watermark file
41+
This order is important to ensure we are not increasing the watermark value if we fail to write the transactions to the CSV file
42+
"""
43+
csv_lines = "\n".join(",".join(str(v) for v in bc) for bc in balance_changes)
44+
with Path.open(csv_file, "a") as f:
45+
f.write(csv_lines)
46+
if csv_lines:
47+
f.write("\n")
48+
49+
watermark_file.write_text(str(new_watermark))
50+
balance_changes.clear()
51+
52+
53+
subscriber = AlgorandSubscriber(
54+
algod_client=algorand.client.algod,
55+
indexer_client=algorand.client.indexer,
56+
config={
57+
"filters": [
58+
{
59+
"name": "Tracked Address",
60+
# Only match non-zero ALGO transfers
61+
"filter": {
62+
"balance_changes": [
63+
{
64+
"address": track_address,
65+
},
66+
],
67+
},
68+
},
69+
],
70+
# Once we are caught up, always wait until the next block is available and process it immediately once available
71+
"wait_for_block_when_at_tip": True,
72+
# The watermark persistence functions are used to get and set the watermark
73+
"watermark_persistence": {"get": get_watermark, "set": set_watermark},
74+
# Sync starting from the last watermark using an archival algod node
75+
"sync_behaviour": "sync-oldest",
76+
},
77+
)
78+
79+
80+
def record_transaction(transaction: SubscribedTransaction, _: str) -> None:
81+
"""
82+
This is an EventListener callback. We use the .on function below to attach this callback to specific events.
83+
84+
Every EventListener callback will receive two arguments:
85+
* The transaction data
86+
* The filter name (from the 'filters' list) that the transaction matched (not used in this example)
87+
"""
88+
global balance_changes # noqa: PLW0602
89+
90+
for bc in transaction["balance_changes"]:
91+
balance_changes.append(
92+
[
93+
transaction["confirmed-round"],
94+
transaction["id"],
95+
bc["asset_id"],
96+
bc["amount"],
97+
]
98+
)
99+
100+
101+
# Attach the callback to the events we are interested in
102+
subscriber.on("Tracked Address", record_transaction)
103+
104+
# Start the subscriber
105+
subscriber.start()

0 commit comments

Comments
 (0)