From d784cd2fd5a5e2eeff78946ddddeddf8a47ea0aff62fefd9b4c11d280c59ee22 Mon Sep 17 00:00:00 2001 From: Minimons Date: Mon, 15 Jun 2026 21:52:53 +0200 Subject: [PATCH] Add Jupiter alarm --- README_alarm.md | 52 +++ .../jupiterperpsalarm/AlarmAction.tjava | 8 + .../CompositeAlarmAction.tjava | 24 ++ .../ConsoleAlarmAction.tjava | 26 ++ .../DovesAgPriceFeedDecoder.tjava | 107 ++++++ .../jupiterperpsalarm/JupiterPerpsAsset.tjava | 19 ++ .../com/r35157/jupiterperpsalarm/Main.tjava | 174 ++++++++++ .../jupiterperpsalarm/OraclePrice.tjava | 16 + .../OracleWebSocketClient.tjava | 323 ++++++++++++++++++ .../r35157/jupiterperpsalarm/PriceAlarm.tjava | 65 ++++ .../jupiterperpsalarm/PriceDirection.tjava | 20 ++ .../PushoverAlarmAction.tjava | 74 ++++ 12 files changed, 908 insertions(+) create mode 100644 README_alarm.md create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/AlarmAction.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/CompositeAlarmAction.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/ConsoleAlarmAction.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/DovesAgPriceFeedDecoder.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/JupiterPerpsAsset.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/Main.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/OraclePrice.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/OracleWebSocketClient.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/PriceAlarm.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/PriceDirection.tjava create mode 100644 src/main/tjava/com/r35157/jupiterperpsalarm/PushoverAlarmAction.tjava diff --git a/README_alarm.md b/README_alarm.md new file mode 100644 index 0000000..35253ff --- /dev/null +++ b/README_alarm.md @@ -0,0 +1,52 @@ +# Jupiter Perps Price Alarm + +A small Java 17 program that listens to Jupiter Perps' on-chain aggregated oracle account through Solana WebSocket `accountSubscribe`. + +It does **not** poll once per second. Every account update observed by the connected RPC node is decoded immediately. The program reconnects automatically, performs an initial/reconnect state fetch, and can connect to multiple independent RPC endpoints for redundancy. + +## Build and test + +```bash +gradle classes +gradle run --args='--self-test' +``` + +## Monitor a SOL short liquidation threshold + +```bash +gradle run --args='--asset=SOL --target=175.00 --direction=above' +``` + +For a long position, liquidation is normally below the current price: + +```bash +gradle run --args='--asset=SOL --target=120.00 --direction=below' +``` + +## Use two RPC WebSocket streams + +A single WebSocket/RPC provider is not a durable event log. For better resilience, provide two independent endpoints: + +```bash +export SOLANA_WS_URLS='wss://first-provider.example,wss://second-provider.example' +gradle run --args='--asset=SOL --target=175 --direction=above' +``` + +The same URL is converted from `wss://` to `https://` for initial and reconnect state retrieval. This works with the usual Solana RPC endpoint format, including API-key query parameters. + +## Pushover emergency alarm + +```bash +export PUSHOVER_APP_TOKEN='...' +export PUSHOVER_USER_KEY='...' +gradle run --args='--asset=SOL --target=175 --direction=above' +``` + +The program sends `priority=2`, `retry=30`, `expire=10800`, and `sound=persistent`. + +## Important limitations + +- `processed` is intentionally used for minimum delay, but a processed update may belong to a fork that is later abandoned. +- Solana PubSub is not guaranteed delivery. Two independent RPC streams reduce, but do not eliminate, the risk of missing an update. +- The alarm reports the Jupiter Perps oracle price. It does not prove that your position was liquidated. For that, also subscribe to your Jupiter position account or relevant program transactions. +- This is an alerting aid, not a substitute for placing an on-platform stop-loss or reducing leverage. diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/AlarmAction.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/AlarmAction.tjava new file mode 100644 index 0000000..2e7e015 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/AlarmAction.tjava @@ -0,0 +1,8 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; + +@FunctionalInterface +public interface AlarmAction { + void trigger(OraclePrice price, BigDecimal target, PriceDirection direction); +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/CompositeAlarmAction.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/CompositeAlarmAction.tjava new file mode 100644 index 0000000..46dd4c0 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/CompositeAlarmAction.tjava @@ -0,0 +1,24 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; +import java.util.List; + +public final class CompositeAlarmAction implements AlarmAction { + + public CompositeAlarmAction(List actions) { + this.actions = List.copyOf(actions); + } + + @Override + public void trigger(OraclePrice price, BigDecimal target, PriceDirection direction) { + for (AlarmAction action : actions) { + try { + action.trigger(price, target, direction); + } catch (RuntimeException exception) { + System.err.println("Alarm action failed: " + exception.getMessage()); + } + } + } + + private final List actions; +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/ConsoleAlarmAction.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/ConsoleAlarmAction.tjava new file mode 100644 index 0000000..3d74fc4 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/ConsoleAlarmAction.tjava @@ -0,0 +1,26 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; + +public final class ConsoleAlarmAction implements AlarmAction { + @Override + public void trigger(OraclePrice price, BigDecimal target, PriceDirection direction) { + System.err.println(); + System.err.println("============================================================"); + System.err.printf( + "ALARM: %s is %s USD; target %s %s USD%n", + price.asset(), + price.priceUsd().toPlainString(), + direction, + target.toPlainString() + ); + System.err.printf( + "Oracle time: %s, slot: %d, source: %s%n", + price.oracleTime(), + price.slot(), + price.source() + ); + System.err.println("============================================================"); + System.err.println(); + } +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/DovesAgPriceFeedDecoder.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/DovesAgPriceFeedDecoder.tjava new file mode 100644 index 0000000..3c7b097 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/DovesAgPriceFeedDecoder.tjava @@ -0,0 +1,107 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Instant; +import java.util.Arrays; + +public final class DovesAgPriceFeedDecoder { + + public static OraclePrice decode( + JupiterPerpsAsset asset, + byte[] accountData, + long slot, + String source + ) { + if (accountData.length < ACCOUNT_SIZE) { + throw new IllegalArgumentException( + "Doves AG account is too short: " + accountData.length + + " bytes; expected at least " + ACCOUNT_SIZE + ); + } + + byte[] discriminator = Arrays.copyOfRange(accountData, 0, 8); + if (!Arrays.equals(discriminator, AG_PRICE_FEED_DISCRIMINATOR)) { + throw new IllegalArgumentException( + "Unexpected Anchor discriminator. The oracle layout may have changed." + ); + } + + BigInteger rawPrice = readUnsignedLongLittleEndian(accountData, PRICE_OFFSET); + int exponent = accountData[EXPONENT_OFFSET]; + long timestamp = ByteBuffer.wrap(accountData, TIMESTAMP_OFFSET, Long.BYTES) + .order(ByteOrder.LITTLE_ENDIAN) + .getLong(); + + BigDecimal priceUsd = new BigDecimal(rawPrice).scaleByPowerOfTen(exponent); + + return new OraclePrice( + asset, + rawPrice, + exponent, + priceUsd, + Instant.ofEpochSecond(timestamp), + slot, + source + ); + } + + public static void selfTest() { + byte[] data = new byte[ACCOUNT_SIZE]; + System.arraycopy(AG_PRICE_FEED_DISCRIMINATOR, 0, data, 0, 8); + + putUnsignedLongLittleEndian(data, PRICE_OFFSET, new BigInteger("123456789")); + data[EXPONENT_OFFSET] = (byte) -6; + ByteBuffer.wrap(data, TIMESTAMP_OFFSET, Long.BYTES) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(1_700_000_000L); + + OraclePrice decoded = decode(JupiterPerpsAsset.SOL, data, 42L, "self-test"); + if (decoded.priceUsd().compareTo(new BigDecimal("123.456789")) != 0) { + throw new IllegalStateException("Decoder self-test failed: " + decoded.priceUsd()); + } + if (!decoded.oracleTime().equals(Instant.ofEpochSecond(1_700_000_000L))) { + throw new IllegalStateException("Timestamp self-test failed"); + } + } + + private static BigInteger readUnsignedLongLittleEndian(byte[] data, int offset) { + byte[] positiveBigEndian = new byte[Long.BYTES + 1]; + for (int index = 0; index < Long.BYTES; index++) { + positiveBigEndian[Long.BYTES - index] = data[offset + index]; + } + return new BigInteger(positiveBigEndian); + } + + private static void putUnsignedLongLittleEndian( + byte[] data, + int offset, + BigInteger value + ) { + BigInteger remaining = value; + for (int index = 0; index < Long.BYTES; index++) { + data[offset + index] = remaining.byteValue(); + remaining = remaining.shiftRight(8); + } + } + + private DovesAgPriceFeedDecoder() { + } + + // Anchor discriminator: sha256("account:AgPriceFeed")[0..8] + private static final byte[] AG_PRICE_FEED_DISCRIMINATOR = { + 0x70, (byte) 0xF9, (byte) 0x8B, (byte) 0xD9, + (byte) 0xD7, (byte) 0xD0, (byte) 0xF9, 0x36 + }; + + // Layout from the Doves IDL: + // discriminator 8 + // mint 32, edgeFeed 32, clFeed 32, pythFeed 32, pythFeedId 32 + // price u64, expo i8, timestamp i64, config(u32,u32,u64), bump u8 + private static final int PRICE_OFFSET = 168; + private static final int EXPONENT_OFFSET = 176; + private static final int TIMESTAMP_OFFSET = 177; + private static final int ACCOUNT_SIZE = 202; +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/JupiterPerpsAsset.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/JupiterPerpsAsset.tjava new file mode 100644 index 0000000..f8c03e1 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/JupiterPerpsAsset.tjava @@ -0,0 +1,19 @@ +package com.r35157.jupiterperpsalarm; + +public enum JupiterPerpsAsset { + SOL("FYq2BWQ1V5P1WFBqr3qB2Kb5yHVvSv7upzKodgQE5zXh"), + ETH("AFZnHPzy4mvVCffrVwhewHbFc93uTHvDSFrVH7GtfXF1"), + BTC("hUqAT1KQ7eW1i6Csp9CXYtpPfSAvi835V7wKi5fRfmC"), + USDC("6Jp2xZUTWdDD2ZyUPRzeMdc6AFQ5K3pFgZxk2EijfjnM"), + USDT("Fgc93D641F8N2d1xLjQ4jmShuD3GE3BsCXA56KBQbF5u"); + + JupiterPerpsAsset(String oracleAccount) { + this.oracleAccount = oracleAccount; + } + + public String oracleAccount() { + return oracleAccount; + } + + private final String oracleAccount; +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/Main.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/Main.tjava new file mode 100644 index 0000000..2cf3f74 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/Main.tjava @@ -0,0 +1,174 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +public final class Main { + + public static void main(String[] args) throws Exception { + Config config; + try { + config = Config.parse(args, System.getenv()); + } catch (IllegalArgumentException exception) { + System.err.println(exception.getMessage()); + printUsage(); + System.exit(2); + return; + } + + if (config.selfTest()) { + DovesAgPriceFeedDecoder.selfTest(); + System.out.println("Decoder self-test passed."); + return; + } + + List actions = new ArrayList<>(); + actions.add(new ConsoleAlarmAction()); + + if (config.pushoverToken() != null && config.pushoverUserKey() != null) { + actions.add(new PushoverAlarmAction( + config.pushoverToken(), + config.pushoverUserKey() + )); + System.out.println("Pushover emergency alarm is enabled."); + } else { + System.out.println( + "Pushover is disabled. Set PUSHOVER_APP_TOKEN and " + + "PUSHOVER_USER_KEY to enable it." + ); + } + + AlarmAction action = new CompositeAlarmAction(actions); + PriceAlarm alarm = new PriceAlarm( + config.asset(), + config.target(), + config.direction(), + action + ); + + List clients = config.webSocketEndpoints().stream() + .map(endpoint -> new OracleWebSocketClient(endpoint, config.asset(), alarm::accept)) + .toList(); + + Runtime.getRuntime().addShutdownHook(new Thread( + () -> clients.forEach(OracleWebSocketClient::close), + "shutdown" + )); + + System.out.printf( + "Monitoring Jupiter Perps %s oracle. Alarm when price is %s %s USD.%n", + config.asset(), + config.direction(), + config.target().toPlainString() + ); + System.out.println("Oracle account: " + config.asset().oracleAccount()); + System.out.println("RPC streams: " + config.webSocketEndpoints().size()); + + clients.forEach(OracleWebSocketClient::start); + new CountDownLatch(1).await(); + } + + private static void printUsage() { + System.err.println(""" + Usage: + gradle run --args='--asset=SOL --target=175.00 --direction=above' + + Options: + --asset=SOL|ETH|BTC Default: SOL + --target= Required + --direction=above|below Default: above + --ws= Default: wss://api.mainnet-beta.solana.com + --self-test Test the binary decoder and exit + + Environment: + SOLANA_WS_URLS Comma-separated RPC WebSocket endpoints + PUSHOVER_APP_TOKEN Pushover application token + PUSHOVER_USER_KEY Pushover user/group key + + For a SOL short liquidation warning, use --direction=above. + """); + } + + private record Config( + JupiterPerpsAsset asset, + BigDecimal target, + PriceDirection direction, + List webSocketEndpoints, + String pushoverToken, + String pushoverUserKey, + boolean selfTest + ) { + private static Config parse(String[] args, Map environment) { + boolean selfTest = Arrays.asList(args).contains("--self-test"); + Map options = Arrays.stream(args) + .filter(argument -> argument.startsWith("--") && argument.contains("=")) + .map(argument -> argument.substring(2).split("=", 2)) + .collect(java.util.stream.Collectors.toMap( + parts -> parts[0], + parts -> parts[1], + (first, second) -> second + )); + + JupiterPerpsAsset asset = JupiterPerpsAsset.valueOf( + options.getOrDefault("asset", "SOL").toUpperCase(Locale.ROOT) + ); + + BigDecimal target = null; + if (!selfTest) { + String targetText = options.get("target"); + if (targetText == null || targetText.isBlank()) { + throw new IllegalArgumentException("Missing required --target="); + } + target = new BigDecimal(targetText); + if (target.signum() <= 0) { + throw new IllegalArgumentException("Target price must be positive"); + } + } + + PriceDirection direction = PriceDirection.valueOf( + options.getOrDefault("direction", "above").toUpperCase(Locale.ROOT) + ); + + String endpointText = options.get("ws"); + if (endpointText == null || endpointText.isBlank()) { + endpointText = environment.getOrDefault( + "SOLANA_WS_URLS", + "wss://api.mainnet-beta.solana.com" + ); + } + + List endpoints = Arrays.stream(endpointText.split(",")) + .map(String::trim) + .filter(value -> !value.isBlank()) + .map(URI::create) + .toList(); + + if (endpoints.isEmpty()) { + throw new IllegalArgumentException("At least one WebSocket endpoint is required"); + } + + return new Config( + asset, + target, + direction, + endpoints, + blankToNull(environment.get("PUSHOVER_APP_TOKEN")), + blankToNull(environment.get("PUSHOVER_USER_KEY")), + selfTest + ); + } + + private static String blankToNull(String value) { + return value == null || value.isBlank() ? null : value; + } + } + + private Main() { + } +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/OraclePrice.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/OraclePrice.tjava new file mode 100644 index 0000000..4e1a6f8 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/OraclePrice.tjava @@ -0,0 +1,16 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; + +public record OraclePrice( + JupiterPerpsAsset asset, + BigInteger rawPrice, + int exponent, + BigDecimal priceUsd, + Instant oracleTime, + long slot, + String source +) { +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/OracleWebSocketClient.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/OracleWebSocketClient.tjava new file mode 100644 index 0000000..0c4330c --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/OracleWebSocketClient.tjava @@ -0,0 +1,323 @@ +package com.r35157.jupiterperpsalarm; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.WebSocket; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Base64; +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public final class OracleWebSocketClient implements AutoCloseable { + + public OracleWebSocketClient( + URI webSocketEndpoint, + JupiterPerpsAsset asset, + Consumer priceConsumer + ) { + this.webSocketEndpoint = Objects.requireNonNull(webSocketEndpoint); + this.httpEndpoint = toHttpEndpoint(webSocketEndpoint); + this.asset = Objects.requireNonNull(asset); + this.priceConsumer = Objects.requireNonNull(priceConsumer); + this.scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread( + runnable, + "oracle-ws-" + this.asset.name().toLowerCase() + "-" + + this.webSocketEndpoint.getHost() + ); + thread.setDaemon(true); + return thread; + }); + } + + public void start() { + if (!running.compareAndSet(false, true)) { + return; + } + + scheduler.scheduleAtFixedRate(this::sendHeartbeat, 20, 20, TimeUnit.SECONDS); + connect(); + } + + @Override + public void close() { + running.set(false); + WebSocket webSocket = currentWebSocket.getAndSet(null); + if (webSocket != null) { + webSocket.abort(); + } + scheduler.shutdownNow(); + } + + private void connect() { + if (!running.get()) { + return; + } + + System.out.println("Connecting to " + webSocketEndpoint); + httpClient.newWebSocketBuilder() + .connectTimeout(Duration.ofSeconds(15)) + .buildAsync(webSocketEndpoint, new Listener()) + .whenComplete((webSocket, error) -> { + if (error != null) { + System.err.printf( + "WebSocket connection failed for %s: %s%n", + webSocketEndpoint, + error.getMessage() + ); + scheduleReconnect(); + } + }); + } + + private void subscribe(WebSocket webSocket) { + String request = """ + {"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["%s",{"encoding":"base64","commitment":"processed"}]} + """.formatted(asset.oracleAccount()).trim(); + + webSocket.sendText(request, true) + .whenComplete((ignored, error) -> { + if (error != null) { + System.err.println("Subscription request failed: " + error.getMessage()); + } else { + System.out.printf( + "Subscribed to %s oracle account %s with processed commitment.%n", + asset, + asset.oracleAccount() + ); + } + }); + } + + private void fetchCurrentState() { + String body = """ + {"jsonrpc":"2.0","id":2,"method":"getAccountInfo","params":["%s",{"encoding":"base64","commitment":"processed"}]} + """.formatted(asset.oracleAccount()).trim(); + + HttpRequest request = HttpRequest.newBuilder(httpEndpoint) + .timeout(Duration.ofSeconds(15)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build(); + + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete((response, error) -> { + if (error != null) { + System.err.printf( + "Initial oracle fetch failed for %s: %s%n", + httpEndpoint, + error.getMessage() + ); + return; + } + if (response.statusCode() != 200) { + System.err.printf( + "Initial oracle fetch returned HTTP %d from %s%n", + response.statusCode(), + httpEndpoint + ); + return; + } + processJson(response.body()); + }); + } + + private void processJson(String json) { + if (json.contains("\"error\"")) { + System.err.println("Solana RPC error from " + webSocketEndpoint + ": " + json); + return; + } + + Matcher dataMatcher = DATA_PATTERN.matcher(json); + if (!dataMatcher.find()) { + return; // Usually the accountSubscribe acknowledgement. + } + + long slot = -1L; + Matcher slotMatcher = SLOT_PATTERN.matcher(json); + if (slotMatcher.find()) { + slot = Long.parseLong(slotMatcher.group(1)); + } + + try { + byte[] accountData = Base64.getDecoder().decode(dataMatcher.group(1)); + OraclePrice price = DovesAgPriceFeedDecoder.decode( + asset, + accountData, + slot, + webSocketEndpoint.toString() + ); + priceConsumer.accept(price); + } catch (RuntimeException exception) { + System.err.printf( + "Could not decode oracle data from %s: %s%n", + webSocketEndpoint, + exception.getMessage() + ); + } + } + + private void sendHeartbeat() { + WebSocket webSocket = currentWebSocket.get(); + if (running.get() && webSocket != null && !webSocket.isOutputClosed()) { + webSocket.sendPing(ByteBuffer.wrap(new byte[]{1})) + .exceptionally(error -> { + System.err.println("WebSocket heartbeat failed: " + error.getMessage()); + return null; + }); + } + } + + private void scheduleReconnect() { + if (!running.get() || !reconnectScheduled.compareAndSet(false, true)) { + return; + } + + long delay = reconnectDelaySeconds; + reconnectDelaySeconds = Math.min(reconnectDelaySeconds * 2, 30); + System.err.printf("Reconnecting to %s in %d seconds.%n", webSocketEndpoint, delay); + + scheduler.schedule(() -> { + reconnectScheduled.set(false); + connect(); + }, delay, TimeUnit.SECONDS); + } + + private static URI toHttpEndpoint(URI webSocketEndpoint) { + String scheme = switch (webSocketEndpoint.getScheme()) { + case "wss" -> "https"; + case "ws" -> "http"; + default -> throw new IllegalArgumentException( + "WebSocket endpoint must use ws:// or wss://" + ); + }; + + try { + return new URI( + scheme, + webSocketEndpoint.getUserInfo(), + webSocketEndpoint.getHost(), + webSocketEndpoint.getPort(), + webSocketEndpoint.getPath(), + webSocketEndpoint.getQuery(), + webSocketEndpoint.getFragment() + ); + } catch (Exception exception) { + throw new IllegalArgumentException("Could not derive HTTP RPC endpoint", exception); + } + } + + private final class Listener implements WebSocket.Listener { + @Override + public void onOpen(WebSocket webSocket) { + currentWebSocket.set(webSocket); + reconnectDelaySeconds = 1; + reconnectScheduled.set(false); + subscribe(webSocket); + fetchCurrentState(); + webSocket.request(1); + } + + @Override + public CompletionStage onText( + WebSocket webSocket, + CharSequence data, + boolean last + ) { + synchronized (textBuffer) { + textBuffer.append(data); + if (last) { + String json = textBuffer.toString(); + textBuffer.setLength(0); + processJson(json); + } + } + webSocket.request(1); + return null; + } + + @Override + public CompletionStage onBinary( + WebSocket webSocket, + ByteBuffer data, + boolean last + ) { + webSocket.request(1); + return null; + } + + @Override + public CompletionStage onPing(WebSocket webSocket, ByteBuffer message) { + webSocket.request(1); + return webSocket.sendPong(message); + } + + @Override + public CompletionStage onPong(WebSocket webSocket, ByteBuffer message) { + webSocket.request(1); + return null; + } + + @Override + public CompletionStage onClose( + WebSocket webSocket, + int statusCode, + String reason + ) { + currentWebSocket.compareAndSet(webSocket, null); + System.err.printf( + "WebSocket closed by %s: code=%d reason=%s%n", + webSocketEndpoint, + statusCode, + reason + ); + scheduleReconnect(); + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + currentWebSocket.compareAndSet(webSocket, null); + System.err.printf( + "WebSocket error from %s: %s%n", + webSocketEndpoint, + error.getMessage() + ); + scheduleReconnect(); + } + + private final StringBuilder textBuffer = new StringBuilder(); + } + + private final URI webSocketEndpoint; + private final URI httpEndpoint; + private final JupiterPerpsAsset asset; + private final Consumer priceConsumer; + private final HttpClient httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(15)) + .build(); + private final ScheduledExecutorService scheduler; + private final AtomicReference currentWebSocket = new AtomicReference<>(); + private final AtomicBoolean running = new AtomicBoolean(); + private final AtomicBoolean reconnectScheduled = new AtomicBoolean(); + + private volatile long reconnectDelaySeconds = 1; + + private static final Pattern DATA_PATTERN = Pattern.compile( + "\\\"data\\\"\\s*:\\s*\\[\\s*\\\"([A-Za-z0-9+/=]+)\\\"\\s*,\\s*\\\"base64\\\"\\s*]" + ); + private static final Pattern SLOT_PATTERN = Pattern.compile( + "\\\"slot\\\"\\s*:\\s*(\\d+)" + ); +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/PriceAlarm.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/PriceAlarm.tjava new file mode 100644 index 0000000..d5dea53 --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/PriceAlarm.tjava @@ -0,0 +1,65 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; +import java.time.Duration; +import java.time.Instant; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public final class PriceAlarm { + + public PriceAlarm( + JupiterPerpsAsset asset, + BigDecimal target, + PriceDirection direction, + AlarmAction action + ) { + this.asset = asset; + this.target = target; + this.direction = direction; + this.action = action; + } + + public synchronized void accept(OraclePrice price) { + String eventKey = price.rawPrice() + ":" + price.exponent() + ":" + + price.oracleTime().getEpochSecond(); + if (!recentEvents.add(eventKey)) { + return; + } + trimRecentEvents(); + + long ageSeconds = Duration.between(price.oracleTime(), Instant.now()).getSeconds(); + System.out.printf( + "%s %s=%s USD oracleAge=%ds slot=%d source=%s%n", + Instant.now(), + asset, + price.priceUsd().toPlainString(), + ageSeconds, + price.slot(), + price.source() + ); + + if (direction.reached(price.priceUsd(), target) && triggered.compareAndSet(false, true)) { + action.trigger(price, target, direction); + } + } + + private void trimRecentEvents() { + while (recentEvents.size() > MAX_RECENT_EVENTS) { + Iterator iterator = recentEvents.iterator(); + iterator.next(); + iterator.remove(); + } + } + + private final JupiterPerpsAsset asset; + private final BigDecimal target; + private final PriceDirection direction; + private final AlarmAction action; + private final AtomicBoolean triggered = new AtomicBoolean(); + private final Set recentEvents = new LinkedHashSet<>(); + + private static final int MAX_RECENT_EVENTS = 512; +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/PriceDirection.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/PriceDirection.tjava new file mode 100644 index 0000000..2811f3a --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/PriceDirection.tjava @@ -0,0 +1,20 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; + +public enum PriceDirection { + ABOVE { + @Override + public boolean reached(BigDecimal price, BigDecimal target) { + return price.compareTo(target) >= 0; + } + }, + BELOW { + @Override + public boolean reached(BigDecimal price, BigDecimal target) { + return price.compareTo(target) <= 0; + } + }; + + public abstract boolean reached(BigDecimal price, BigDecimal target); +} diff --git a/src/main/tjava/com/r35157/jupiterperpsalarm/PushoverAlarmAction.tjava b/src/main/tjava/com/r35157/jupiterperpsalarm/PushoverAlarmAction.tjava new file mode 100644 index 0000000..70c5d0d --- /dev/null +++ b/src/main/tjava/com/r35157/jupiterperpsalarm/PushoverAlarmAction.tjava @@ -0,0 +1,74 @@ +package com.r35157.jupiterperpsalarm; + +import java.math.BigDecimal; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; + +public final class PushoverAlarmAction implements AlarmAction { + + public PushoverAlarmAction(String applicationToken, String userKey) { + this.applicationToken = applicationToken; + this.userKey = userKey; + } + + @Override + public void trigger(OraclePrice price, BigDecimal target, PriceDirection direction) { + String title = "Jupiter Perps " + price.asset() + " alarm"; + String message = String.format( + "%s is %s USD. Target: %s %s USD. Oracle time: %s. Slot: %d.", + price.asset(), + price.priceUsd().toPlainString(), + direction, + target.toPlainString(), + price.oracleTime(), + price.slot() + ); + + String body = form("token", applicationToken) + "&" + + form("user", userKey) + "&" + + form("title", title) + "&" + + form("message", message) + "&" + + "priority=2&retry=30&expire=10800&sound=persistent"; + + HttpRequest request = HttpRequest.newBuilder(PUSHOVER_URI) + .timeout(Duration.ofSeconds(15)) + .header("Content-Type", "application/x-www-form-urlencoded") + .header("User-Agent", "jupiter-perps-price-alarm/1.0") + .POST(HttpRequest.BodyPublishers.ofString(body)) + .build(); + + httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + .whenComplete((response, error) -> { + if (error != null) { + System.err.println("Pushover failed: " + error.getMessage()); + return; + } + if (response.statusCode() != 200 || !response.body().contains("\"status\":1")) { + System.err.printf( + "Pushover rejected the alarm: HTTP %d: %s%n", + response.statusCode(), + response.body() + ); + } else { + System.out.println("Pushover emergency alarm sent."); + } + }); + } + + private static String form(String name, String value) { + return URLEncoder.encode(name, StandardCharsets.UTF_8) + "=" + + URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + private final HttpClient httpClient = HttpClient.newHttpClient(); + private final String applicationToken; + private final String userKey; + + private static final URI PUSHOVER_URI = + URI.create("https://api.pushover.net/1/messages.json"); +}