add airdrop

This commit is contained in:
Swenschaeferjohann
2024-12-20 06:12:30 +00:00
parent e0ff4399df
commit c13ab12aee
7 changed files with 483 additions and 422 deletions

View File

@@ -17,7 +17,6 @@ import {
getTokenDataByAddress,
getTokenDataByTicker,
stakeWithJup,
createCompressedAirdrop,
sendCompressedAirdrop,
} from "../tools";
import { CollectionOptions, PumpFunTokenOptions } from "../types";
@@ -142,17 +141,20 @@ export class SolanaAgentKit {
return stakeWithJup(this, amount);
}
async airdropCompressedTokens(
async sendCompressedAirdrop(
mintAddress: string,
amount: number,
recipients: string[]
) {
await createCompressedAirdrop(
recipients: string[],
priorityFeeInLamports: number,
shouldLog: boolean
): Promise<string[]> {
return await sendCompressedAirdrop(
this,
new PublicKey(mintAddress),
BigInt(amount),
recipients.map((recipient) => new PublicKey(recipient))
amount,
recipients.map((recipient) => new PublicKey(recipient)),
priorityFeeInLamports,
shouldLog
);
return await sendCompressedAirdrop(this);
}
}

View File

@@ -710,6 +710,7 @@ export class SolanaAirdropCompressedTokensTool extends Tool {
- mintAddress: string, the mint address of the token, e.g., "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN"
- amount: number, the amount of tokens to airdrop per recipient, e.g., 42
- recipients: string[], the recipient addresses, e.g., ["JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN", "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN"]
- priorityFeeInLamports: number, the priority fee in lamports, e.g., 10_000. Default is 30_000.
`;
constructor(private solanaKit: SolanaAgentKit) {
@@ -719,18 +720,19 @@ export class SolanaAirdropCompressedTokensTool extends Tool {
protected async _call(input: string): Promise<string> {
try {
const parsedInput = JSON.parse(input);
if (parsedInput.recipients.length <= 100) {
throw new Error("Recipients array must contain at least 420 addresses");
}
await this.solanaKit.airdropCompressedTokens(
const txs = await this.solanaKit.sendCompressedAirdrop(
parsedInput.mintAddress,
parsedInput.amount,
parsedInput.recipients
parsedInput.recipients,
parsedInput.priorityFeeInLamports || 30_000,
false // no logging
);
return JSON.stringify({
status: "success",
message: `Airdropped ${parsedInput.amount} tokens to ${parsedInput.recipients.length} recipients.`,
transactionHashes: txs,
});
} catch (error: any) {
return JSON.stringify({

View File

@@ -1,228 +1,272 @@
import { PublicKey } from "@solana/web3.js";
import type { DrizzleDb, WorkerMessage, WorkerData } from "./types";
import {
AddressLookupTableAccount,
ComputeBudgetProgram,
Connection,
Keypair,
PublicKey,
TransactionInstruction,
} from "@solana/web3.js";
import { SolanaAgentKit } from "../../agent/index.js";
import {
buildAndSignTx,
calculateComputeUnitPrice,
createRpc,
Rpc,
sendAndConfirmTx,
sleep,
} from "@lightprotocol/stateless.js";
import {
CompressedTokenProgram,
createTokenPool,
} from "@lightprotocol/compressed-token";
import { Account, getOrCreateAssociatedTokenAccount } from "@solana/spl-token";
let db: DrizzleDb | null = null;
let dbInitPromise: Promise<DrizzleDb | null> | null = null;
async function configureForBrowser() {
try {
const [{ SQLocalDrizzle }, { drizzle }, { sql }, heliusCore] =
await Promise.all([
// @ts-ignore
import("sqlocal/drizzle"),
import("drizzle-orm/sqlite-proxy"),
import("drizzle-orm"),
import("helius-airship-core"),
]);
const { databaseFile } = heliusCore;
const { driver, batchDriver } = new SQLocalDrizzle({
databasePath: databaseFile,
verbose: false,
});
const database = drizzle(driver, batchDriver);
await database.run(sql`PRAGMA journal_mode = WAL;`);
await database.run(sql`PRAGMA synchronous = normal;`);
return database;
} catch (error) {
console.error("Browser database configuration failed:", error);
throw error;
}
}
async function configureForNode() {
try {
const [{ drizzle }, { default: Database }, { databaseFile }] =
await Promise.all([
import("drizzle-orm/better-sqlite3"),
import("better-sqlite3"),
import("helius-airship-core"),
]);
const sqlite = new Database(databaseFile);
sqlite.exec("PRAGMA journal_mode = WAL;");
sqlite.exec("PRAGMA synchronous = normal;");
return drizzle(sqlite);
} catch (error) {
console.error("Node database configuration failed:", error);
throw error;
}
}
async function configureDatabase(): Promise<DrizzleDb> {
if (!dbInitPromise) {
dbInitPromise = (async () => {
if (!db) {
db =
typeof window !== "undefined"
? await configureForBrowser()
: await configureForNode();
}
return db;
})();
}
const database = await dbInitPromise;
if (!database) throw new Error("Database initialization failed");
return database;
}
async function createWorker(): Promise<
Worker | import("worker_threads").Worker
> {
if (typeof window !== "undefined") {
const origin = new URL(window.location.href).origin;
if (
!origin.startsWith("https://") &&
!origin.startsWith("http://localhost")
) {
throw new Error("Invalid origin protocol");
}
const workerCode = `
self.importScripts('${origin}/airdrop-worker.js'.replace(/[<>'"]/g, ''));
`;
const blobOptions = {
type: "application/javascript",
headers: {
"Content-Security-Policy": "default-src 'self'",
},
};
const blob = new Blob([workerCode], blobOptions);
const workerUrl = URL.createObjectURL(blob);
const worker = new Worker(workerUrl);
URL.revokeObjectURL(workerUrl);
return worker;
} else {
// Node
const { Worker } = await import("worker_threads");
const path = await import("path");
return new Worker(path.resolve(__dirname, "worker.js"));
}
}
const MAX_AIRDROP_RECIPIENTS = 1000;
const MAX_CONCURRENT = 30;
/**
* Create airdrop with zk compression
* @param agent Agent
* @param mintAddress Token mint public key (non token-2022)
* @param amount amount of tokens to airdrop per recipient
* @param recipients Recipient public keys
*/
export async function createCompressedAirdrop(
agent: SolanaAgentKit,
mintAddress: PublicKey,
amount: bigint,
recipients: PublicKey[]
): Promise<void> {
try {
const database = await configureDatabase();
const { create, init } = await import("helius-airship-core");
await init({
db: database as any,
});
await create({
db: database as any,
signer: agent.wallet.publicKey,
addresses: recipients,
amount,
mintAddress,
});
} catch (error) {
console.error("Create operation failed:", error);
throw error;
}
}
/**
* Send airdrop. must be called after `createCompressedAirdrop`
* @param agent Agent
* @param onProgress Callback for progress updates
* @param onError Callback for error handling
* Send airdrop with ZK Compressed Tokens.
* @param agent Agent
* @param mintAddress SPL Mint address
* @param amount Amount to send per recipient
* @param recipients Recipient wallet addresses (no ATAs)
* @param shouldLog Whether to log progress to stdout. Defaults to false.
*/
export async function sendCompressedAirdrop(
agent: SolanaAgentKit,
onProgress?: (progress: number) => void,
onError?: (error: Error) => void
): Promise<void> {
let worker: Worker | import("worker_threads").Worker | null = null;
const { databaseFile } = await import("helius-airship-core");
mintAddress: PublicKey,
amount: number,
recipients: PublicKey[],
prioFeeInLamports: number,
shouldLog: boolean = false
): Promise<string[]> {
if (recipients.length > MAX_AIRDROP_RECIPIENTS) {
throw new Error(
`Max airdrop can be ${MAX_AIRDROP_RECIPIENTS} recipients at a time. For more scale, use open source ZK Compression airdrop tools such as https://github.com/helius-labs/airship.`
);
}
let sourceTokenAccount: Account;
try {
sourceTokenAccount = await getOrCreateAssociatedTokenAccount(
agent.connection,
agent.wallet,
mintAddress,
agent.wallet.publicKey
);
} catch (error) {
throw new Error(
"Source token account not found and failed to create it. Please add funds to your wallet and try again."
);
}
return new Promise(async (resolve, reject) => {
const cleanup = () => {
if (worker) {
try {
if ("terminate" in worker) {
worker.terminate();
}
} catch (error) {
console.error("[Main] Worker cleanup error:", error);
}
}
};
try {
await createTokenPool(
agent.connection as unknown as Rpc,
agent.wallet,
mintAddress
);
} catch (error: any) {
if (error.message.includes("already in use")) {
// skip
} else {
throw error;
}
}
try {
worker = await createWorker();
return await processAll(
agent,
amount,
mintAddress,
recipients,
prioFeeInLamports,
shouldLog
);
}
async function processAll(
agent: SolanaAgentKit,
amount: number,
mint: PublicKey,
recipients: PublicKey[],
prioFeeInLamports: number,
shouldLog: boolean
): Promise<string[]> {
const mintAddress = mint;
const payer = agent.wallet;
const message: WorkerData = {
type: "send",
data: {
secretKey: Array.from(agent.wallet.secretKey),
url: agent.connection.rpcEndpoint,
dbPath: databaseFile,
},
};
const sourceTokenAccount = await getOrCreateAssociatedTokenAccount(
agent.connection,
agent.wallet,
mintAddress,
agent.wallet.publicKey
);
const handleMessage = (event: MessageEvent<WorkerMessage>) => {
if (event.data === undefined) {
cleanup();
reject(new Error());
return;
}
const { type, data, error } = event.data;
switch (type) {
case "progress":
onProgress?.(data!);
break;
case "error":
cleanup();
const errorObj = new Error(error);
onError?.(errorObj);
reject(errorObj);
break;
case "complete":
cleanup();
resolve();
break;
}
};
const maxRecipientsPerInstruction = 5;
const maxIxs = 3; // empirically determined (as of 12/15/2024)
const lookupTableAddress = new PublicKey(
"9NYFyEqPkyXUhkerbGHXUXkvb4qpzeEdHuGpgbgpH1NJ"
);
if (typeof window !== "undefined") {
(worker as Worker).onmessage = handleMessage;
} else {
(worker as import("worker_threads").Worker).on(
"message",
handleMessage
const lookupTableAccount = (
await agent.connection.getAddressLookupTable(lookupTableAddress)
).value!;
const batches: PublicKey[][] = [];
for (
let i = 0;
i < recipients.length;
i += maxRecipientsPerInstruction * maxIxs
) {
batches.push(recipients.slice(i, i + maxRecipientsPerInstruction * maxIxs));
}
const instructionSets = await Promise.all(
batches.map(async (recipientBatch) => {
const instructions: TransactionInstruction[] = [
ComputeBudgetProgram.setComputeUnitLimit({ units: 500_000 }),
ComputeBudgetProgram.setComputeUnitPrice({
microLamports: calculateComputeUnitPrice(prioFeeInLamports, 500_000),
}),
];
const compressIxPromises = [];
for (
let i = 0;
i < recipientBatch.length;
i += maxRecipientsPerInstruction
) {
const batch = recipientBatch.slice(i, i + maxRecipientsPerInstruction);
compressIxPromises.push(
CompressedTokenProgram.compress({
payer: payer.publicKey,
owner: payer.publicKey,
source: sourceTokenAccount.address,
toAddress: batch,
amount: batch.map(() => amount),
mint: mintAddress,
})
);
}
worker.postMessage(message);
} catch (error) {
cleanup();
console.error("[Main] Send operation failed:", {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
});
onError?.(error instanceof Error ? error : new Error(String(error)));
reject(error);
const compressIxs = await Promise.all(compressIxPromises);
return [...instructions, ...compressIxs];
})
);
const url = agent.connection.rpcEndpoint;
if (url.includes("devnet")) {
throw new Error("Devnet is not supported for airdrop. Please use mainnet.");
}
if (!url.includes("helius")) {
console.warn(
"Warning: Must use RPC with ZK Compression support. Double check with your RPC provider if in doubt."
);
}
const rpc = createRpc(url, url, url);
const results = [];
let confirmedCount = 0;
const totalBatches = instructionSets.length;
const renderProgressBar = (current: number, total: number) => {
const percentage = Math.floor((current / total) * 100);
const filled = Math.floor((percentage / 100) * 20);
const empty = 20 - filled;
const bar = "█".repeat(filled) + "░".repeat(empty);
return `Airdropped to ${Math.min(current * 15, recipients.length)}/${
recipients.length
} recipients [${bar}] ${percentage}%`;
};
const log = (message: string) => {
if (shouldLog && typeof process !== "undefined" && process.stdout) {
process.stdout.write(message);
}
});
};
for (let i = 0; i < instructionSets.length; i += MAX_CONCURRENT) {
const batchPromises = instructionSets
.slice(i, i + MAX_CONCURRENT)
.map((instructions, idx) =>
sendTransactionWithRetry(
rpc,
instructions,
payer,
lookupTableAccount,
i + idx
).then((signature) => {
confirmedCount++;
log("\r" + renderProgressBar(confirmedCount, totalBatches));
return signature;
})
);
const batchResults = await Promise.allSettled(batchPromises);
results.push(...batchResults);
}
log("\n");
const failures = results
.filter((r) => r.status === "rejected")
.map((r, idx) => ({
index: idx,
error: (r as PromiseRejectedResult).reason,
}));
if (failures.length > 0) {
throw new Error(
`Failed to process ${failures.length} batches: ${failures
.map((f) => f.error)
.join(", ")}`
);
}
return results.map((r) => (r as PromiseFulfilledResult<string>).value);
}
async function sendTransactionWithRetry(
connection: Rpc,
instructions: TransactionInstruction[],
payer: Keypair,
lookupTableAccount: AddressLookupTableAccount,
batchIndex: number
): Promise<string> {
const MAX_RETRIES = 3;
const INITIAL_BACKOFF = 500; // ms
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
const { blockhash } = await connection.getLatestBlockhash();
const tx = buildAndSignTx(
instructions,
payer,
blockhash,
[],
[lookupTableAccount]
);
const signature = await sendAndConfirmTx(connection, tx);
return signature;
} catch (error: any) {
const isRetryable =
error.message?.includes("blockhash not found") ||
error.message?.includes("timeout") ||
error.message?.includes("rate limit") ||
error.message?.includes("too many requests");
if (!isRetryable || attempt === MAX_RETRIES - 1) {
throw new Error(
`Batch ${batchIndex} failed after ${attempt + 1} attempts: ${
error.message
}`
);
}
const backoff =
INITIAL_BACKOFF * Math.pow(2, attempt) * (0.5 + Math.random());
await sleep(backoff);
}
}
throw new Error("Unreachable");
}

View File

@@ -1,69 +0,0 @@
import type { BetterSQLite3Database } from "drizzle-orm/better-sqlite3";
import type { SqliteRemoteDatabase } from "drizzle-orm/sqlite-proxy";
export interface Closeable {
close(): Promise<void> | void;
}
export type DrizzleDb = BetterSQLite3Database | SqliteRemoteDatabase;
export interface WorkerMessage {
type: "progress" | "error" | "complete";
data?: number;
error?: string;
}
export interface WorkerData {
type: "send" | "poll";
data: {
dbPath: string;
url: string;
secretKey?: number[];
};
}
export interface WorkerError extends Error {
code?: string;
type: "worker_error";
}
export interface DatabaseError extends Error {
code?: string;
type: "database_error";
}
export function isWorkerMessage(message: any): message is WorkerMessage {
return (
message &&
typeof message === "object" &&
"type" in message &&
(message.type === "progress" ||
message.type === "error" ||
message.type === "complete")
);
}
export function isWorkerData(data: any): data is WorkerData {
return (
data &&
typeof data === "object" &&
"type" in data &&
(data.type === "send" || data.type === "poll") &&
"data" in data &&
typeof data.data === "object" &&
typeof data.data.dbPath === "string" &&
typeof data.data.url === "string"
);
}
export function isWorkerError(error: any): error is WorkerError {
return (
error instanceof Error && "type" in error && error.type === "worker_error"
);
}
export function isDatabaseError(error: any): error is DatabaseError {
return (
error instanceof Error && "type" in error && error.type === "database_error"
);
}

View File

@@ -1,128 +0,0 @@
import { send } from "helius-airship-core";
import { Keypair } from "@solana/web3.js";
import type { WorkerMessage, WorkerData, DrizzleDb, Closeable } from "./types";
let db: DrizzleDb | null = null;
let dbInitPromise: Promise<DrizzleDb | null> | null = null;
async function initializeDb(dbPath: string): Promise<DrizzleDb> {
if (!dbInitPromise) {
dbInitPromise = (async () => {
if (!db) {
try {
if (typeof window !== "undefined") {
const [{ SQLocalDrizzle }, { drizzle }, { sql }] =
await Promise.all([
// @ts-ignore
import("sqlocal/drizzle"),
import("drizzle-orm/sqlite-proxy"),
import("drizzle-orm"),
]);
const { driver, batchDriver } = new SQLocalDrizzle({
databasePath: dbPath,
verbose: false,
});
db = drizzle(driver, batchDriver);
await db.run(sql`PRAGMA journal_mode = WAL;`);
await db.run(sql`PRAGMA synchronous = normal;`);
} else {
const [{ drizzle }, { default: Database }] = await Promise.all([
import("drizzle-orm/better-sqlite3"),
import("better-sqlite3"),
]);
const sqlite = new Database(dbPath);
sqlite.exec("PRAGMA journal_mode = WAL;");
sqlite.exec("PRAGMA synchronous = normal;");
db = drizzle(sqlite);
}
} catch (error) {
console.error("Worker database initialization failed:", error);
throw error;
}
}
return db;
})();
}
const database = await dbInitPromise;
if (!database) throw new Error("Worker database initialization failed");
return database;
}
function postMessage(message: WorkerMessage) {
if (typeof window !== "undefined") {
self.postMessage(message);
} else {
const { parentPort } = require("worker_threads");
parentPort?.postMessage(message);
}
}
async function handleMessage(data: WorkerData) {
let database: DrizzleDb | null = null;
try {
database = await initializeDb(data.data.dbPath);
switch (data.type) {
case "send":
if (!data.data.secretKey) {
throw new Error("Secret key is required for send operation");
}
await send({
db: database,
keypair: Keypair.fromSecretKey(new Uint8Array(data.data.secretKey)),
url: data.data.url,
});
break;
}
postMessage({ type: "complete" });
} catch (error) {
console.error("[Worker] Operation failed:", {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
type: data.type,
url: data.data.url,
});
postMessage({
type: "error",
error: error instanceof Error ? error.message : String(error),
});
} finally {
if (database && "close" in database) {
try {
await (database as Closeable).close();
} catch (error) {
console.error("Error closing database connection:", error);
}
}
}
}
if (typeof window !== "undefined") {
self.onmessage = (event: MessageEvent<WorkerData>) => {
handleMessage(event.data).catch((error) => {
postMessage({
type: "error",
error: error instanceof Error ? error.message : String(error),
});
});
};
} else {
const { parentPort } = require("worker_threads");
parentPort?.on("message", (data: WorkerData) => {
handleMessage(data).catch((error) => {
postMessage({
type: "error",
error: error instanceof Error ? error.message : String(error),
});
});
});
}