diff --git a/at_client/README.md b/at_client/README.md index 1a5994a0..639a2f6a 100644 --- a/at_client/README.md +++ b/at_client/README.md @@ -70,7 +70,7 @@ The SDK currently depends on the following ## Example Usage The command line classes under -[src/main/java/org/atsign/client/cli](src/main/java/org/atsign/client/cli) +[src/main/java/org/atsign/client/cli](src/main/java/org/atsign/client/impl/cli) serve as simple examples of how to instantiate an AtClient instance and invoke its interface. @@ -135,11 +135,11 @@ mvn exec:java -Dexec.mainClass=org.atsign.client.cli.Delete \ ### Register -The utility **org.atsign.client.cli.Register** can be used to perform +The utility **org.atsign.client.cli.register.Register** can be used to perform registration operations. ```shell -mvn exec:java -Dexec.mainClass=org.atsign.client.cli.Register -Dexec.args="--help" +mvn exec:java -Dexec.mainClass=org.atsign.client.cli.register.Register -Dexec.args="--help" ``` When using the SUPER_API Key to register an atsign, the following sequence of diff --git a/at_client/pom.xml b/at_client/pom.xml index 1f3daa51..897b3e3a 100644 --- a/at_client/pom.xml +++ b/at_client/pom.xml @@ -17,6 +17,11 @@ ../config/java-format.xml ../config/checkstyle.xml + + + 0.77 + 0.7 + @@ -85,6 +90,11 @@ maven-checkstyle-plugin + + org.jacoco + jacoco-maven-plugin + + org.codehaus.mojo exec-maven-plugin @@ -120,6 +130,11 @@ bcprov-jdk15to18 + + io.netty + netty-all + + info.picocli picocli @@ -136,6 +151,12 @@ provided + + org.bouncycastle + bcpkix-jdk18on + test + + org.junit.jupiter junit-jupiter diff --git a/at_client/src/main/java/org/atsign/client/api/AtClient.java b/at_client/src/main/java/org/atsign/client/api/AtClient.java index 5fde566b..c3570947 100644 --- a/at_client/src/main/java/org/atsign/client/api/AtClient.java +++ b/at_client/src/main/java/org/atsign/client/api/AtClient.java @@ -1,225 +1,257 @@ package org.atsign.client.api; -import static org.atsign.common.Keys.AtKey; -import static org.atsign.common.Keys.PublicKey; -import static org.atsign.common.Keys.SelfKey; -import static org.atsign.common.Keys.SharedKey; +import static org.atsign.client.api.Keys.*; -import java.io.Closeable; -import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; -import org.atsign.client.api.impl.clients.AtClientImpl; -import org.atsign.client.api.impl.connections.AtRootConnection; -import org.atsign.client.api.impl.connections.DefaultAtConnectionFactory; -import org.atsign.client.api.impl.events.SimpleAtEventBus; -import org.atsign.client.api.impl.secondaries.RemoteSecondary; -import org.atsign.common.AtException; -import org.atsign.common.AtSign; -import org.atsign.common.exceptions.AtSecondaryConnectException; -import org.atsign.common.exceptions.AtSecondaryNotFoundException; -import org.atsign.common.options.GetRequestOptions; +import lombok.Builder; +import lombok.Value; /** - * The primary interface of the AtSign client library. + * Represents something which enables users to interact with an AtServer as + * a map-like interface. + * Users are able to invoke put, get and delete operations for the different key + * types ({@link PublicKey}, {@link SelfKey} and {@link SharedKey}). + * Users can also subscribe to notifications, an {@link AtClient} is an + * {@link org.atsign.client.api.AtEvents.AtEventBus}, allowing users to + * register {@link org.atsign.client.api.AtEvents.AtEventListener}s. + * {@link AtClient}s are closeable resources and should be treated accordingly. */ -@SuppressWarnings("unused") -public interface AtClient extends Secondary, AtEvents.AtEventBus, Closeable { +public interface AtClient extends AtEvents.AtEventBus, AutoCloseable { /** - * Standard AtClient factory - uses production @ root to look up the cloud secondary address for - * this atSign + * The methods in this interface are from the perspective of this {@link AtSign}. + * For example when executing put the sharedBy {@link AtSign} + * should be the {@link AtSign} of the {@link AtClient}. * - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @return An {@link AtClient} - * @throws AtException if something goes wrong with looking up or connecting to the remote secondary + * @return the {@link AtSign} which this {@link AtClient} is representing. */ - static AtClient withRemoteSecondary(AtSign atSign, AtKeys keys) throws AtException { - return withRemoteSecondary("root.atsign.org:64", atSign, keys); - } + AtSign getAtSign(); /** - * Standard AtClient factory - uses production @ root to look up the cloud secondary address for - * this atSign + * Users are able to get this and execute At Protocol commands directly. * - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @param verbose set to true for chatty logs - * @return An {@link AtClient} - * @throws AtException if something goes wrong with looking up or connecting to the remote secondary + * @return The underlying {@link AtCommandExecutor} which this {@link AtClient} is using to interact + * the At Server. */ - static AtClient withRemoteSecondary(AtSign atSign, AtKeys keys, boolean verbose) throws AtException { - return withRemoteSecondary("root.atsign.org:64", atSign, keys, verbose); - } + AtCommandExecutor getCommandExecutor(); /** - * Factory to use when you wish to use a custom Secondary.AddressFinder - * - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @param secondaryAddressFinder will be used to find the Secondary.Address of the atSign - * @return An {@link AtClient} - * @throws AtException if any other exception occurs while connecting to the remote (cloud) - * secondary - */ - static AtClient withRemoteSecondary(AtSign atSign, AtKeys keys, Secondary.AddressFinder secondaryAddressFinder) - throws AtException { - Secondary.Address remoteSecondaryAddress; - try { - remoteSecondaryAddress = secondaryAddressFinder.findSecondary(atSign); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to find secondary, with IOException", e); - } - return withRemoteSecondary(atSign, keys, remoteSecondaryAddress, false); - } + * Used to get a String associated with a shared key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy or the sharedWith for the key. + * + * @param sharedKey A {@link SharedKey} + * @return A {@link CompletableFuture} for the String value associated with this key. + */ + CompletableFuture get(SharedKey sharedKey); /** - * Factory - returns default AtClientImpl with a RemoteSecondary and a DefaultConnectionFactory + * Used to get a byte array associated with a shared key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy or the sharedWith for the key. * - * @param rootUrl the address of the root server to use - e.g. root.atsign.org:64 for production - * at-signs - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @return An {@link AtClient} - * @throws AtException if anything goes wrong during construction + * @param sharedKey A {@link SharedKey} + * @return A {@link CompletableFuture} for the byte array value associated with this key. */ - static AtClient withRemoteSecondary(String rootUrl, AtSign atSign, AtKeys keys) throws AtException { - return withRemoteSecondary(rootUrl, atSign, keys, false); - } + CompletableFuture getBinary(SharedKey sharedKey); /** - * Factory - returns default AtClientImpl with a RemoteSecondary and a DefaultConnectionFactory + * Used to associate a String with a shared key. + * The {@link AtClient} {@link AtSign} should be the sharedBy or the + * sharedWith for the key. * - * @param rootUrl the address of the root server to use - e.g. root.atsign.org:64 for production - * at-signs - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @param verbose set to true for chatty logs - * @return An {@link AtClient} - * @throws AtException if anything goes wrong during construction + * @param sharedKey A {@link SharedKey}. + * @param value The string value (this cannot be null). + * @return A {@link CompletableFuture} for this operation. */ - static AtClient withRemoteSecondary(String rootUrl, AtSign atSign, AtKeys keys, boolean verbose) throws AtException { - DefaultAtConnectionFactory connectionFactory = new DefaultAtConnectionFactory(); - - Secondary.Address secondaryAddress; - try { - AtRootConnection rootConnection = connectionFactory.getRootConnection(new SimpleAtEventBus(), rootUrl, verbose); - rootConnection.connect(); - secondaryAddress = rootConnection.findSecondary(atSign); - } catch (AtSecondaryNotFoundException e) { - throw e; - } catch (Exception e) { - throw new AtSecondaryNotFoundException("Failed to lookup remote secondary", e); - } - - return withRemoteSecondary(atSign, keys, secondaryAddress, verbose); - } + CompletableFuture put(SharedKey sharedKey, String value); /** - * Factory to use when you wish to use a custom Secondary.AddressFinder + * Used to delete a shared key. + * The {@link AtClient} {@link AtSign} should be the sharedBy for the key. * - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @param verbose set to true for chatty logs - * @return An {@link AtClient} - * @throws IOException if thrown by the address finder - * @throws AtException if any other exception occurs while connecting to the remote (cloud) - * secondary + * @param sharedKey A {@link SharedKey}. + * @return A {@link CompletableFuture} for this operation. */ - static AtClient withRemoteSecondary(AtSign atSign, AtKeys keys, Secondary.AddressFinder secondaryAddressFinder, - boolean verbose) - throws IOException, AtException { - Secondary.Address remoteSecondaryAddress = secondaryAddressFinder.findSecondary(atSign); - return withRemoteSecondary(atSign, keys, remoteSecondaryAddress, verbose); - } + CompletableFuture delete(SharedKey sharedKey); /** - * Factory to use when you already know the address of the remote (cloud) secondary + * Used to get a String associated with a self key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. * - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @param remoteSecondaryAddress the address of the remote secondary server - * @param verbose set to true for chatty logs - * @return An {@link AtClient} - * @throws AtException if any other exception occurs while connecting to the remote (cloud) - * secondary + * @param selfKey A {@link SelfKey} + * @return A {@link CompletableFuture} for the String value associated with this key. */ - static AtClient withRemoteSecondary(AtSign atSign, AtKeys keys, Secondary.Address remoteSecondaryAddress, - boolean verbose) - throws AtException { - DefaultAtConnectionFactory connectionFactory = new DefaultAtConnectionFactory(); - AtEvents.AtEventBus eventBus = new SimpleAtEventBus(); - - RemoteSecondary secondary; - try { - secondary = new RemoteSecondary(eventBus, atSign, remoteSecondaryAddress, keys, connectionFactory, verbose); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to create RemoteSecondary", e); - } - - return new AtClientImpl(eventBus, atSign, keys, secondary); - } + CompletableFuture get(SelfKey selfKey); /** - * Factory to use when you already know the address of the remote (cloud) secondary + * Used to get a byte array associated with a self key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. * - * @param remoteSecondaryAddress the address of the remote secondary server - * @param atSign the {@link AtSign} of this client - e.g. @alice - * @param keys the {@link AtKeys} for this client - * @return An {@link AtClient} - * @throws AtException if any other exception occurs while connecting to the remote (cloud) - * secondary + * @param selfKey A {@link SelfKey} + * @return A {@link CompletableFuture} for the byte array value associated with this key. */ - static AtClient withRemoteSecondary(Secondary.Address remoteSecondaryAddress, AtSign atSign, AtKeys keys) - throws AtException { - return withRemoteSecondary(atSign, keys, remoteSecondaryAddress, false); - } - - - - AtSign getAtSign(); - - Secondary getSecondary(); - - AtKeys getEncryptionKeys(); - - CompletableFuture get(SharedKey sharedKey); + CompletableFuture getBinary(SelfKey selfKey); - CompletableFuture getBinary(SharedKey sharedKey); + /** + * Used to associate a String with a self key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. + * + * @param selfKey A {@link SelfKey}. + * @param value The string value to be associated with the {@link SelfKey} (this cannot be null). + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture put(SelfKey selfKey, String value); - CompletableFuture put(SharedKey sharedKey, String value); + /** + * Used to delete a self key. + * The {@link AtClient} {@link AtSign} should be the sharedBy for the key. + * + * @param selfKey A {@link SelfKey}. + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture delete(SelfKey selfKey); - CompletableFuture delete(SharedKey sharedKey); + /** + * Used to get a String associated with a specific key from the perspective of the + * {@link AtClient}'s {@link AtSign}. + * + * @param publicKey A {@link PublicKey}. + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture get(PublicKey publicKey); - CompletableFuture get(SelfKey selfKey); + /** + * Used to get a String associated with a public key. + * + * @param publicKey A {@link PublicKey}. + * @param options Can be used to control the caching behavior. + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture get(PublicKey publicKey, GetRequestOptions options); - CompletableFuture getBinary(SelfKey selfKey); + /** + * Used to get a byte array associated with a public key. + * + * @param publicKey A {@link PublicKey}. + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture getBinary(PublicKey publicKey); - CompletableFuture put(SelfKey selfKey, String value); + /** + * Used to get a byte array associated with a public key. + * + * @param publicKey A {@link PublicKey}. + * @param options Can be used to control the caching behavior. + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture getBinary(PublicKey publicKey, GetRequestOptions options); - CompletableFuture delete(SelfKey selfKey); + /** + * Used to associate a String with a public key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. + * + * @param publicKey A {@link PublicKey}. + * @param value The string value associated with the {@link PublicKey} (this cannot be null). + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture put(PublicKey publicKey, String value); - CompletableFuture get(PublicKey publicKey); + /** + * Used to delete a public key. + * The {@link AtClient} {@link AtSign} should be the sharedBy for the key. + * + * @param publicKey A {@link PublicKey}. + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture delete(PublicKey publicKey); - CompletableFuture get(PublicKey publicKey, GetRequestOptions getRequestOptions); + /** + * Used to associate a byte array with a shared key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. + * + * @param sharedKey A {@link SharedKey}. + * @param value The byte array to associated with the {@link SharedKey} + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture put(SharedKey sharedKey, byte[] value); - CompletableFuture getBinary(PublicKey publicKey); + /** + * Used to associate a byte array with a self key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. + * + * @param selfKey A {@link SelfKey}. + * @param value The byte array to associated with the {@link SelfKey} + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture put(SelfKey selfKey, byte[] value); - CompletableFuture getBinary(PublicKey publicKey, GetRequestOptions getRequestOptions); + /** + * Used to associate a byte array with a public key from the perspective of the + * {@link AtClient}'s {@link AtSign}.The {@link AtClient} {@link AtSign} should be the + * sharedBy for the key. + * + * @param publicKey A {@link PublicKey}. + * @param value The byte array to associated with the {@link PublicKey} + * @return A {@link CompletableFuture} for this operation. + */ + CompletableFuture put(PublicKey publicKey, byte[] value); - CompletableFuture put(PublicKey publicKey, String value); + /** + * Used to retrieve a list of typed {@link AtKeys} that are visible to this {@link AtClient}'s + * {@link AtSign} and have a key name what matches a regular expression. + * + * @param regex A regular expression String that will be be used to filter the returned keys. + * @return list of {@link AtKeys} subclass instances. + */ + default CompletableFuture> getAtKeys(String regex) { + return getAtKeys(regex, false); + } - CompletableFuture delete(PublicKey publicKey); + /** + * Used to retrieve a list of typed {@link AtKeys}, with metadata populated that are visible + * to this {@link AtClient}'s {@link AtSign} and have a key name what matches a regular expression. + * + * @param regex A regular expression String that will be be used to filter the returned keys. + * @param fetchMetadata If true then for lookup metadata for every key. + * @return list of {@link AtKeys} subclass instances. + */ + CompletableFuture> getAtKeys(String regex, boolean fetchMetadata); - CompletableFuture put(SharedKey sharedKey, byte[] value); + /** + * Used to send a monitor command and to ensure notifications are dispatched to registered + * {@link org.atsign.client.api.AtEvents.AtEventListener}s. + */ + void startMonitor(); - CompletableFuture put(SelfKey selfKey, byte[] value); + /** + * Used to "clear" the monitor command. + * {@link org.atsign.client.api.AtEvents.AtEventListener}s. + */ + void stopMonitor(); - CompletableFuture put(PublicKey publicKey, byte[] value); + /** + * Used to check if this {@link AtClient} has sent a monitor command. + * + * @return true is monitor has been set (and not stopped). + */ + boolean isMonitorRunning(); - CompletableFuture> getAtKeys(String regex); + /** + * Data class used to model options to the {@link org.atsign.client.api.AtClient} get methods + */ + @Value + @Builder + class GetRequestOptions { + boolean bypassCache; + } - CompletableFuture> getAtKeys(String regex, boolean fetchMetadata); } diff --git a/at_client/src/main/java/org/atsign/client/api/AtCommandExecutor.java b/at_client/src/main/java/org/atsign/client/api/AtCommandExecutor.java new file mode 100644 index 00000000..5c66fb5e --- /dev/null +++ b/at_client/src/main/java/org/atsign/client/api/AtCommandExecutor.java @@ -0,0 +1,86 @@ +package org.atsign.client.api; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +/** + * Represents something that is capable of sending At Protocol commands, + * and receiving responses, to and from At Server command interface. + * {@link AtCommandExecutor}s are closeable resources and should be treated accordingly. + */ +public interface AtCommandExecutor extends AutoCloseable { + + /** + * Asynchronously executes an At Protocol command that is expected to return a response. + * + * @param command the command to execute + * @param future the future which will be completed with the response + */ + void send(String command, CompletableFuture future); + + /** + * Asynchronously executes an At Protocol command that is expected to return a response. + * + * @param command the command to execute + * @return the future which will be completed with the response + */ + default CompletableFuture send(String command) { + CompletableFuture future = new CompletableFuture<>(); + send(command, future); + return future; + } + + /** + * Synchronously executes an At Protocol command that is expected to return a response. + * This method will return once the command has been sent to the AtSign server and the response + * has been received. + * + * @param command the command to execute + * @return the response + */ + String sendSync(String command) throws ExecutionException, InterruptedException; + + /** + * Asynchronously executes an At Protocol command that is expected to return a stream of events. + * + * @param command the command to execute + * @param consumer the consumer which the connection will invoke with each event + * @param future the future which will be completed when the command is sent and acknowledged + */ + void send(String command, Consumer consumer, CompletableFuture future); + + /** + * Asynchronously executes an At Protocol command that is expected to return a stream of events. + * + * @param command the command to execute + * @param consumer the consumer which the connection will invoke with each event + * @return a future which will be completed when the command is sent and acknowledged + */ + default CompletableFuture send(String command, Consumer consumer) { + CompletableFuture future = new CompletableFuture<>(); + send(command, consumer, future); + return future; + } + + /** + * Synchronously executes an At Protocol command that is expected to return a stream of events. + * This method will return once the command has been sent to the AtSign server and some form of + * acknowledgement has been received. + * + * @param command the command to execute + * @param consumer the consumer which the connection will invoke with each event + */ + void sendSync(String command, Consumer consumer) throws ExecutionException, InterruptedException; + + /** + * Registers a consumer that will be invoked when the connection is ready for sending commands. + * This is intended to be used to execute commands that set up state, for example to ensure that + * a connection is authenticated or that a connection is registered for notifications. + * If this connection is already ready then the consumer will be invoked immediately. + * + * @param consumer a consumer that will perform commands + * @return this (to allow chaining / fluent style invocation) + */ + AtCommandExecutor onReady(Consumer consumer); +} diff --git a/at_client/src/main/java/org/atsign/client/api/AtConnection.java b/at_client/src/main/java/org/atsign/client/api/AtConnection.java deleted file mode 100644 index 21ace30b..00000000 --- a/at_client/src/main/java/org/atsign/client/api/AtConnection.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.atsign.client.api; - -import java.io.IOException; -import java.net.Socket; - -import org.atsign.common.AtException; - -/** - * A simple abstraction around connections to @ platform services - e.g. the root server and - * secondary servers - */ -@SuppressWarnings("unused") -public interface AtConnection { - String getUrl(); - - String getHost(); - - int getPort(); - - Socket getSocket(); - - boolean isConnected(); - - boolean isAutoReconnect(); - - boolean isVerbose(); - - void setVerbose(boolean verbose); - - void connect() throws IOException, AtException; - - void disconnect(); - - String executeCommand(String command) throws IOException; - - /** - * Represents something which can implement atprotocol authentication workflow - * by executing commands with a {@link AtConnection} - */ - interface Authenticator { - void authenticate(AtConnection connection) throws AtException, IOException; - } - -} diff --git a/at_client/src/main/java/org/atsign/client/api/AtConnectionFactory.java b/at_client/src/main/java/org/atsign/client/api/AtConnectionFactory.java deleted file mode 100644 index 8936769f..00000000 --- a/at_client/src/main/java/org/atsign/client/api/AtConnectionFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.atsign.client.api; - -import org.atsign.client.api.impl.connections.AtRootConnection; -import org.atsign.client.api.impl.connections.AtSecondaryConnection; -import org.atsign.common.AtSign; - -/** - * For getting a hold of AtConnections to things. We inject an AtConnectionFactory into - * AtClientImpl, primarily for testability - */ -public interface AtConnectionFactory { - AtSecondaryConnection getSecondaryConnection(AtEvents.AtEventBus eventBus, - AtSign atSign, - Secondary.Address secondaryAddress, - AtConnection.Authenticator authenticator); - - AtSecondaryConnection getSecondaryConnection(AtEvents.AtEventBus eventBus, - AtSign atSign, - String secondaryUrl, - AtConnection.Authenticator authenticator, - boolean verbose); - - AtSecondaryConnection getSecondaryConnection(AtEvents.AtEventBus eventBus, - AtSign atSign, - Secondary.Address secondaryAddress, - AtConnection.Authenticator authenticator, - boolean verbose); - - AtRootConnection getRootConnection(AtEvents.AtEventBus eventBus, String rootUrl); - - AtRootConnection getRootConnection(AtEvents.AtEventBus eventBus, String rootUrl, boolean verbose); -} diff --git a/at_client/src/main/java/org/atsign/client/api/AtEvents.java b/at_client/src/main/java/org/atsign/client/api/AtEvents.java index 401d36cf..4c526664 100644 --- a/at_client/src/main/java/org/atsign/client/api/AtEvents.java +++ b/at_client/src/main/java/org/atsign/client/api/AtEvents.java @@ -4,29 +4,29 @@ import java.util.Set; /** - * Parent interface for related interfaces + * Parent interface for event related interfaces and enum. */ public interface AtEvents { /** - * Represents something that will receive events + * Listener interface for events. */ interface AtEventListener { void handleEvent(AtEventType eventType, Map eventData); } /** - * Represents something that can dispatch events to registered {@link AtEventListener} + * Represents something that can dispatch events to registered {@link AtEventListener} instances. */ interface AtEventBus { /** - * @param listener to handle various events which originate from Secondaries - * @param eventTypes the set of EventTypes that the listener is interested in + * @param listener The handler / callback which will receive events. + * @param eventTypes The set of EventTypes that the listener is interested in. */ void addEventListener(AtEventListener listener, Set eventTypes); /** - * @param listener the listener to remove + * @param listener The listener to remove. */ void removeEventListener(AtEventListener listener); diff --git a/at_client/src/main/java/org/atsign/client/api/AtKeyNames.java b/at_client/src/main/java/org/atsign/client/api/AtKeyNames.java index eee0d3ab..4a1997d7 100644 --- a/at_client/src/main/java/org/atsign/client/api/AtKeyNames.java +++ b/at_client/src/main/java/org/atsign/client/api/AtKeyNames.java @@ -1,9 +1,7 @@ package org.atsign.client.api; -import org.atsign.common.AtSign; - /** - * Constants and Utility methods for "well known" standard keys + * Constants and utility methods for "well known" keys. */ public class AtKeyNames { @@ -54,4 +52,9 @@ public static String toSharedWithMeKeyName(AtSign sharedBy, AtSign sharedWith) { return String.format("%s:%s%s", SHARED_KEY, sharedWith, sharedBy); } + + public static boolean isManagementKeyName(String s) { + return s.matches(".+\\.__manage@.+"); + } + } diff --git a/at_client/src/main/java/org/atsign/client/api/AtKeys.java b/at_client/src/main/java/org/atsign/client/api/AtKeys.java index 3b5171d2..53d5aea2 100644 --- a/at_client/src/main/java/org/atsign/client/api/AtKeys.java +++ b/at_client/src/main/java/org/atsign/client/api/AtKeys.java @@ -7,17 +7,18 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.atsign.client.util.EnrollmentId; - import lombok.Builder; import lombok.Value; +import org.atsign.client.impl.common.EnrollmentId; /** - * An immutable class used to hold an {@link org.atsign.client.api.AtClient}s keys. + * An immutable class used to hold an {@link org.atsign.client.api.AtClient}s keys. These + * are use for authentication and encryption. *

* Examples: * *

+ *
  * AtKeys keys = AtKeys.builder()
  *     .selfEncryptKey(generateAESKeyBase64())
  *     .apkamKeyPair(generateRSAKeyPair())
@@ -56,13 +57,13 @@ public class AtKeys {
    * request uses this key to encrypt the {@link #selfEncryptKey} and {@link #encryptPrivateKey}
    * in the response that it sends.
    * The process which requested the enrollment can then decrypt and store those keys.
-   * This ensures that all {@link AtKeys} got and {@link org.atsign.common.AtSign} share the same
+   * This ensures that all {@link AtKeys} got and {@link AtSign} share the same
    * {@link #selfEncryptKey} and {@link #encryptPrivateKey}
    */
   String apkamSymmetricKey;
 
   /**
-   * Encryption Key used to encrypt {@link org.atsign.common.Keys.SelfKey}s and the pkam and
+   * Encryption Key used to encrypt {@link Keys.SelfKey}s and the pkam and
    * encryption key pairs
    * when they are externalised as JSON
    */
@@ -70,16 +71,16 @@ public class AtKeys {
 
   /**
    * This is used to encrypt the symmetric keys that are used to encrypt
-   * {@link org.atsign.common.Keys.SharedKey}s
-   * where the shared with {@link org.atsign.common.AtSign} is this {@link org.atsign.common.AtSign}
+   * {@link Keys.SharedKey}s
+   * where the shared with {@link AtSign} is this {@link AtSign}
    */
 
   String encryptPublicKey;
 
   /**
    * This is used to decrypt the symmetric keys that are used to encrypt
-   * {@link org.atsign.common.Keys.SharedKey}s
-   * where the shared with {@link org.atsign.common.AtSign} is this {@link org.atsign.common.AtSign}
+   * {@link Keys.SharedKey}s
+   * where the shared with {@link AtSign} is this {@link AtSign}
    */
   String encryptPrivateKey;
 
@@ -129,8 +130,26 @@ public Map getCache() {
     return Collections.unmodifiableMap(cache);
   }
 
+
   /**
-   * Builder utility.
+   * A builder for instantiating {@link AtKeys}.
+   *
+   * 
+   *
+   * AtKeys keys = AtKeys.builder()
+   *     .selfEncryptKey(EncryptionUtil.generateAESKeyBase64())
+   *     .apkamKeyPair(EncryptionUtil.generateRSAKeyPair())
+   *     .apkamSymmetricKey(EncryptionUtil.generateAESKeyBase64())
+   *     .build();
+   * 
+ * + * NOTE {@link AtKeys} are immutable, so use the toBuilder() method + * to create a modified instance. + * + *
+   *
+   * AtKeys newKeys = keys.toBuilder().enrollmentId(enrollmentId).build();
+   * 
*/ public static class AtKeysBuilder { diff --git a/at_client/src/main/java/org/atsign/common/AtSign.java b/at_client/src/main/java/org/atsign/client/api/AtSign.java similarity index 69% rename from at_client/src/main/java/org/atsign/common/AtSign.java rename to at_client/src/main/java/org/atsign/client/api/AtSign.java index 2a3b1eea..101b54a5 100644 --- a/at_client/src/main/java/org/atsign/common/AtSign.java +++ b/at_client/src/main/java/org/atsign/client/api/AtSign.java @@ -1,7 +1,7 @@ -package org.atsign.common; +package org.atsign.client.api; -import org.atsign.client.util.Preconditions; -import org.atsign.client.util.TypedString; +import org.atsign.client.impl.common.Preconditions; +import org.atsign.client.impl.common.TypedString; /** @@ -9,7 +9,7 @@ */ public class AtSign extends TypedString { - public AtSign(String s) { + private AtSign(String s) { super(formatAtSign(s)); } @@ -18,10 +18,10 @@ public String withoutPrefix() { } /** - * Factory method + * A factory method for instantiating {@link AtSign} instances. * * @param s the string representation of the Atsign (can be with our without @ prefix) - * @return null is s is null or blank, otherwise the corresponding {@link AtSign} for s + * @return null if param is null or blank, otherwise the corresponding {@link AtSign} for the param */ public static AtSign createAtSign(String s) { return s != null && !s.isBlank() ? new AtSign(s) : null; diff --git a/at_client/src/main/java/org/atsign/common/Keys.java b/at_client/src/main/java/org/atsign/client/api/Keys.java similarity index 85% rename from at_client/src/main/java/org/atsign/common/Keys.java rename to at_client/src/main/java/org/atsign/client/api/Keys.java index 1c88d0d7..635cb35b 100644 --- a/at_client/src/main/java/org/atsign/common/Keys.java +++ b/at_client/src/main/java/org/atsign/client/api/Keys.java @@ -1,9 +1,9 @@ -package org.atsign.common; +package org.atsign.client.api; -import static org.atsign.client.util.Preconditions.*; -import static org.atsign.common.AtSign.createAtSign; -import static org.atsign.common.Metadata.*; +import static org.atsign.client.api.AtSign.createAtSign; +import static org.atsign.client.api.Metadata.*; +import static org.atsign.client.impl.common.Preconditions.*; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -121,13 +121,15 @@ public String namespace() { } /** - * * @return the name without the namespace component. */ public String nameWithoutNamespace() { return stripNamespace(name); } + /** + * @return the rawKey. + */ @Override public String toString() { return rawKey; @@ -146,7 +148,7 @@ protected String createRawKey() { } /** - * Represents a "public key" in the Atsign Platform + * Models a "public key" in the Atsign Platform specification. */ public static class PublicKey extends AtKey { protected PublicKey(AtSign sharedBy, String name, Metadata metadata) { @@ -176,7 +178,29 @@ public static PublicKey publicKey(AtSign sharedBy, String name, String namespace } /** - * Represents a "self key" in the Atsign Platform + * A builder for instantiating {@link Keys.PublicKey} instances. + * + *
+
+   * Keys.PublicKey key = Keys.publicKeyBuilder()
+   *     .sharedBy(...)   // the AtSign which is sharing the key
+   *     .name(...)       // the key name
+   *     .namespace(...)
+   *     .ttl(...)
+   *     .ttb(...)
+   *     .ttr(...)
+   *     .ccd(...)
+   *     .isCached(...)
+   *     .isBinary(...)
+   *     .metadata(...)
+   *     .build();
+   * 
+ */ + public static class PublicKeyBuilder { + } + + /** + * Models a "self key" in the Atsign Platform specification. */ public static class SelfKey extends AtKey { protected SelfKey(AtSign sharedBy, AtSign sharedWith, String name, Metadata metadata) { @@ -207,7 +231,30 @@ public static SelfKey selfKey(AtSign sharedBy, AtSign sharedWith, String name, S } /** - * Represents a "shared key" in the Atsign Platform + * A builder for instantiating {@link Keys.SelfKey} instances. + * + *
+
+   * Keys.SelfKey key = Keys.selfKeyBuilder()
+   *     .sharedBy(...)   // the AtSign which is sharing the key
+   *     .sharedWith(...)
+   *     .name(...)       // the key name
+   *     .namespace(...)
+   *     .ttl(...)
+   *     .ttb(...)
+   *     .ttr(...)
+   *     .ccd(...)
+   *     .isHidden(...)
+   *     .isBinary(...)
+   *     .metadata(...)
+   *     .build();
+   * 
+ */ + public static class SelfKeyBuilder { + } + + /** + * Models a "shared key" in the Atsign Platform specification. */ public static class SharedKey extends AtKey { protected SharedKey(AtSign sharedBy, AtSign sharedWith, String name, Metadata metadata) { @@ -246,6 +293,31 @@ public static SharedKey sharedKey(AtSign sharedBy, AtSign sharedWith, String nam return new SharedKey(sharedBy, sharedWith, toName(name, namespace), metadata); } + /** + * A builder for instantiating {@link Keys.SharedKey} instances. + * + *
+
+   * Keys.SharedKey key = Keys.sharedKeyBuilder()
+   *     .sharedBy(...)   // the AtSign which is sharing the key
+   *     .sharedWith(...) // the AtSign which the key is being shared with
+   *     .name(...)       // the key name
+   *     .namespace(...)
+   *     .ttl(...)
+   *     .ttb(...)
+   *     .ttr(...)
+   *     .ccd(...)
+   *     .isCached(...)
+   *     .isHidden(...)
+   *     .isBinary(...)
+   *     .metadata(...)
+   *     .rawKey(...)     // the raw key i.e. @sharedWith:name@sharedBy
+   *     .build();
+   * 
+ */ + public static class SharedKeyBuilder { + } + /** * Represents a "private hidden key" in the Atsign Platform */ @@ -274,6 +346,26 @@ public static AtKey key(String rawKey, Metadata metadata) { throw new IllegalArgumentException(rawKey + " does NOT match any raw key parser"); } + /** + * A builder for instantiating typed {@link AtKey} instances from raw key names. + * e.g. @sharedWith:name@sharedBy or public:name@sharedBy. This builder will + * decode everything based on the raw key name. + * + *
+   *
+   * Keys.AtKey key = Keys.keyBuilder().rawKey(...).build();
+   * 
+ * + * Metadata can also be provided. + * + *
+   *
+   * Keys.AtKey key = Keys.keyBuilder().rawKey(...).metadata(...).build();
+   * 
+ */ + public static class KeyBuilder { + } + private interface RawKeyParser extends Predicate { T parse(String rawKey, Metadata metadata); } diff --git a/at_client/src/main/java/org/atsign/common/Metadata.java b/at_client/src/main/java/org/atsign/client/api/Metadata.java similarity index 95% rename from at_client/src/main/java/org/atsign/common/Metadata.java rename to at_client/src/main/java/org/atsign/client/api/Metadata.java index 6895fe6a..48f66d8f 100644 --- a/at_client/src/main/java/org/atsign/common/Metadata.java +++ b/at_client/src/main/java/org/atsign/client/api/Metadata.java @@ -1,13 +1,13 @@ -package org.atsign.common; +package org.atsign.client.api; import java.time.OffsetDateTime; -import com.fasterxml.jackson.core.JsonProcessingException; import lombok.Builder; import lombok.Value; import lombok.experimental.Accessors; import lombok.extern.jackson.Jacksonized; +import org.atsign.client.impl.util.JsonUtils; /** * Value class which models key metadata in the Atsign Platform @@ -44,20 +44,23 @@ public class Metadata { String encoding; String ivNonce; - // required for successful javadoc - /** * A builder for instantiating {@link Metadata} instances. Note: Metadata is immutable so if you - * want create a modified instance then use the toBuilder() method, override the fields and invoke - * build(). + * want to create a modified instance then use the toBuilder() method, override the fields and + * invoke build(). */ public static class MetadataBuilder { + // required for javadoc }; - public static Metadata fromJson(String json) throws JsonProcessingException { - return Json.MAPPER.readValue(json, Metadata.class); + public static Metadata fromJson(String json) { + return JsonUtils.readValue(json, Metadata.class); } + /** + * + * @return the encoded metadata fields as recognized by an At Server in an update command. + */ @Override public String toString() { return new StringBuilder() diff --git a/at_client/src/main/java/org/atsign/client/api/Secondary.java b/at_client/src/main/java/org/atsign/client/api/Secondary.java deleted file mode 100644 index 5fccbad0..00000000 --- a/at_client/src/main/java/org/atsign/client/api/Secondary.java +++ /dev/null @@ -1,191 +0,0 @@ -package org.atsign.client.api; - -import java.io.Closeable; -import java.io.IOException; - -import org.atsign.common.AtException; -import org.atsign.common.AtSign; -import org.atsign.common.exceptions.*; - -/** - * Clients ultimately talk to a Secondary server - usually this is a microservice which implements - * the @ protocol server spec, running somewhere in the cloud. - *
- * In the initial implementation we just have AtClientImpl talking to a RemoteSecondary which in - * turn - * talks, via TLS over a secure socket, to the cloud Secondary server. - *
- * As we implement client-side offline storage, performance caching etc., we can expect e.g. - *
- * AtClient {@code ->} FastCacheSecondary {@code ->} OfflineStorageSecondary {@code ->} - * RemoteSecondary - *
- * where FastCacheSecondary might be an in-memory LRU cache, and OfflineStorageSecondary is a - * persistent cache of some or all of the information in the RemoteSecondary. To make this - * possible, each Secondary will need to be able to fully handle the @ protocol, thus the - * interface is effectively the same as when interacting with a cloud secondary via openssl - * from command line. - */ -public interface Secondary extends AtEvents.AtEventListener, Closeable { - /** - * @param command in @ protocol format - * @param throwExceptionOnErrorResponse sometimes we want to inspect an error response, - * sometimes we want to just throw an exception - * @return response in @ protocol format - * @throws AtException if there was an error response and throwExceptionOnErrorResponse is true - * @throws IOException if one is encountered - */ - Response executeCommand(String command, boolean throwExceptionOnErrorResponse) throws IOException, AtException; - - void startMonitor(); - - void stopMonitor(); - - boolean isMonitorRunning(); - - /** - * Used to hold the partially decoded response from a {@link Secondary} - */ - class Response { - private String rawDataResponse = null; - private String rawErrorResponse; - private String errorCode; - private String errorText; - - public String getRawDataResponse() { - return rawDataResponse; - } - - public void setRawDataResponse(String s) { - rawDataResponse = s; - rawErrorResponse = null; - errorCode = null; - errorText = null; - } - - public String getRawErrorResponse() { - return rawErrorResponse; - } - - public void setRawErrorResponse(String s) { - // In format "AT1234-meaning of error code : " - rawErrorResponse = s; - rawDataResponse = null; - - int codeDelimiter = rawErrorResponse.indexOf(":"); - String errorCodeSegment = rawErrorResponse.substring(0, codeDelimiter).trim(); - String[] separatedByHyphen = errorCodeSegment.split("-"); - errorCode = separatedByHyphen[0].trim(); - errorText = rawErrorResponse.substring(codeDelimiter + 1).trim(); - } - - public boolean isError() { - return rawErrorResponse != null; - } - - public String getErrorCode() { - return errorCode; - } - - public String getErrorText() { - return errorText; - } - - @Override - public String toString() { - if (isError()) { - return "error:" + rawErrorResponse; - } else { - return "data:" + rawDataResponse; - } - } - - public AtException getException() { - if (!isError()) { - return null; - } - if (AtServerRuntimeException.CODE.equals(errorCode)) { - return new AtServerRuntimeException(errorText); - } else if (AtInvalidSyntaxException.CODE.equals(errorCode)) { - return new AtInvalidSyntaxException(errorText); - } else if (AtBufferOverFlowException.CODE.equals(errorCode)) { - return new AtBufferOverFlowException(errorText); - } else if (AtOutboundConnectionLimitException.CODE.equals(errorCode)) { - return new AtOutboundConnectionLimitException(errorText); - } else if (AtSecondaryNotFoundException.CODE.equals(errorCode)) { - return new AtSecondaryNotFoundException(errorText); - } else if (AtHandShakeException.CODE.equals(errorCode)) { - return new AtHandShakeException(errorText); - } else if (AtUnauthorizedException.CODE.equals(errorCode)) { - return new AtUnauthorizedException(errorText); - } else if (AtInternalServerError.CODE.equals(errorCode)) { - return new AtInternalServerError(errorText); - } else if (AtInternalServerException.CODE.equals(errorCode)) { - return new AtInternalServerException(errorText); - } else if (AtInboundConnectionLimitException.CODE.equals(errorCode)) { - return new AtInboundConnectionLimitException(errorText); - } else if (AtBlockedConnectionException.CODE.equals(errorCode)) { - return new AtBlockedConnectionException(errorText); - } else if (AtKeyNotFoundException.CODE.equals(errorCode)) { - return new AtKeyNotFoundException(errorText); - } else if (AtInvalidAtKeyException.CODE.equals(errorCode)) { - return new AtInvalidAtKeyException(errorText); - } else if (AtSecondaryConnectException.CODE.equals(errorCode)) { - return new AtSecondaryConnectException(errorText); - } else if (AtIllegalArgumentException.CODE.equals(errorCode)) { - return new AtIllegalArgumentException(errorText); - } else if (AtTimeoutException.CODE.equals(errorCode)) { - return new AtTimeoutException(errorText); - } else if (AtServerIsPausedException.CODE.equals(errorCode)) { - return new AtServerIsPausedException(errorText); - } else if (AtUnauthenticatedException.CODE.equals(errorCode)) { - return new AtUnauthenticatedException(errorText); - } - - return new AtNewErrorCodeWhoDisException(errorCode, errorText); - } - } - - /** - * Value class for hostname and port tuple - */ - class Address { - public final String host; - public final int port; - - public Address(String host, int port) { - this.host = host; - this.port = port; - } - - public static Address fromString(String hostAndPort) throws IllegalArgumentException { - String[] split = hostAndPort.split(":"); - if (split.length != 2) { - throw new IllegalArgumentException( - "Cannot construct Secondary.Address from malformed host:port string '" + hostAndPort + "'"); - } - String host = split[0]; - int port; - try { - port = Integer.parseInt(split[1]); - } catch (NumberFormatException e) { - throw new IllegalArgumentException( - "Cannot construct Secondary.Address from malformed host:port string '" + hostAndPort + "'"); - } - return new Address(host, port); - } - - @Override - public String toString() { - return host + ":" + port; - } - } - - /** - * Represents something that, given an {@link AtSign}, can resolve the {@link Address} of the - * {@link Secondary} for this {@link AtSign} - */ - interface AddressFinder { - Address findSecondary(AtSign atSign) throws IOException, AtSecondaryNotFoundException; - } -} diff --git a/at_client/src/main/java/org/atsign/client/api/impl/clients/AtClientImpl.java b/at_client/src/main/java/org/atsign/client/api/impl/clients/AtClientImpl.java deleted file mode 100644 index 0a97523f..00000000 --- a/at_client/src/main/java/org/atsign/client/api/impl/clients/AtClientImpl.java +++ /dev/null @@ -1,796 +0,0 @@ -package org.atsign.client.api.impl.clients; - -import static org.atsign.client.api.AtEvents.AtEventType.decryptedUpdateNotification; -import static org.atsign.client.api.AtKeyNames.toSharedByMeKeyName; -import static org.atsign.client.util.Preconditions.checkNotNull; -import static org.atsign.common.VerbBuilders.*; -import static org.atsign.common.VerbBuilders.LookupOperation.all; -import static org.atsign.common.VerbBuilders.LookupOperation.meta; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; - - -import lombok.extern.slf4j.Slf4j; -import org.atsign.client.api.AtClient; -import org.atsign.client.api.AtEvents.AtEventBus; -import org.atsign.client.api.AtEvents.AtEventListener; -import org.atsign.client.api.AtEvents.AtEventType; -import org.atsign.client.api.AtKeyNames; -import org.atsign.client.api.AtKeys; -import org.atsign.client.api.Secondary; -import org.atsign.client.util.EncryptionUtil; -import org.atsign.common.*; -import org.atsign.common.Keys.AtKey; -import org.atsign.common.Keys.PublicKey; -import org.atsign.common.Keys.SelfKey; -import org.atsign.common.Keys.SharedKey; -import org.atsign.common.exceptions.*; -import org.atsign.common.options.GetRequestOptions; -import org.atsign.common.response_models.LookupResponse; - -import com.fasterxml.jackson.core.JsonProcessingException; - -/** - * Implementation of an {@link AtClient} which wraps a {@link Secondary} - * in order to implement the "map like" features of the {@link AtClient} interface - */ -@SuppressWarnings({"RedundantThrows", "unused"}) -@Slf4j -public class AtClientImpl implements AtClient { - - // Factory method - creates an AtClientImpl with a RemoteSecondary - - private final AtSign atSign; - - @Override - public AtSign getAtSign() { - return atSign; - } - - private final AtKeys keys; - - @Override - public AtKeys getEncryptionKeys() { - return keys; - } - - private final Secondary secondary; - - @Override - public Secondary getSecondary() { - return secondary; - } - - private final AtEventBus eventBus; - - public AtClientImpl(AtEventBus eventBus, AtSign atSign, AtKeys keys, Secondary secondary) { - this.eventBus = eventBus; - this.atSign = atSign; - this.keys = keys; - this.secondary = secondary; - checkNotNull(keys.getEncryptPrivateKey(), "AtKeys have not been fully enrolled"); - eventBus.addEventListener(this, EnumSet.allOf(AtEventType.class)); - } - - @Override - public void startMonitor() { - secondary.startMonitor(); - } - - @Override - public void stopMonitor() { - secondary.stopMonitor(); - } - - @Override - public boolean isMonitorRunning() { - return secondary.isMonitorRunning(); - } - - @Override - public synchronized void addEventListener(AtEventListener listener, Set eventTypes) { - eventBus.addEventListener(listener, eventTypes); - } - - @Override - public synchronized void removeEventListener(AtEventListener listener) { - eventBus.removeEventListener(listener); - } - - @Override - public int publishEvent(AtEventType eventType, Map eventData) { - return eventBus.publishEvent(eventType, eventData); - } - - @Override - public synchronized void handleEvent(AtEventType eventType, Map eventData) { - switch (eventType) { - case sharedKeyNotification: - // We've got notification that someone has shared an encryption key with us - // If we also got a value, we can decrypt it and add it to our keys map - // Note: a value isn't supplied when the ttr on the shared key was set to 0 - if (eventData.get("value") != null) { - String sharedSharedKeyName = (String) eventData.get("key"); - String sharedSharedKeyEncryptedValue = (String) eventData.get("value"); - // decrypt it with our encryption private key - try { - String sharedKeyDecryptedValue = - EncryptionUtil.rsaDecryptFromBase64(sharedSharedKeyEncryptedValue, keys.getEncryptPrivateKey()); - keys.put(sharedSharedKeyName, sharedKeyDecryptedValue); - } catch (Exception e) { - log.error("caught exception {} while decrypting received shared key {}", e, sharedSharedKeyName); - } - } - break; - case updateNotification: - // Let's see if we can decrypt it on the fly - if (eventData.get("value") != null) { - String key = (String) eventData.get("key"); - String encryptedValue = (String) eventData.get("value"); - @SuppressWarnings("unchecked") - Map metadata = (Map) eventData.get("metadata"); - String ivNonce = (String) metadata.get("ivNonce"); - - try { - // decrypt it with the symmetric key that the other atSign shared with me - SharedKey sk = Keys.sharedKeyBuilder().rawKey(key).build(); - String encryptionKeySharedByOther = getEncryptionKeySharedByOther(sk); - - String decryptedValue = - EncryptionUtil.aesDecryptFromBase64(encryptedValue, encryptionKeySharedByOther, ivNonce); - HashMap newEventData = new HashMap<>(eventData); - newEventData.put("decryptedValue", decryptedValue); - eventBus.publishEvent(decryptedUpdateNotification, newEventData); - } catch (Exception e) { - log.error("caught exception while decrypting received data with key name [{}]", key, e); - } - } - break; - default: - break; - } - } - - @Override - public CompletableFuture get(SharedKey sharedKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _get(sharedKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture getBinary(SharedKey sharedKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _getBinary(sharedKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture put(SharedKey sharedKey, String value) { - return CompletableFuture.supplyAsync(() -> { - try { - return _put(sharedKey, value); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture delete(SharedKey sharedKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _delete(sharedKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture get(SelfKey selfKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _get(selfKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture getBinary(SelfKey selfKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _getBinary(selfKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture put(SelfKey selfKey, String value) { - return CompletableFuture.supplyAsync(() -> { - try { - return _put(selfKey, value); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture delete(SelfKey selfKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _delete(selfKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture get(PublicKey publicKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _get(publicKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture get(PublicKey publicKey, GetRequestOptions getRequestOptions) { - return CompletableFuture.supplyAsync(() -> { - try { - return _get(publicKey, getRequestOptions); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture getBinary(PublicKey publicKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _getBinary(publicKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture getBinary(PublicKey publicKey, GetRequestOptions getRequestOptions) { - return CompletableFuture.supplyAsync(() -> { - try { - return _getBinary(publicKey, getRequestOptions); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture put(PublicKey publicKey, String value) { - return CompletableFuture.supplyAsync(() -> { - try { - return _put(publicKey, value); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture delete(PublicKey publicKey) { - return CompletableFuture.supplyAsync(() -> { - try { - return _delete(publicKey); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture put(SharedKey sharedKey, byte[] value) { - return CompletableFuture.supplyAsync(() -> { - try { - return _put(sharedKey, value); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture put(SelfKey selfKey, byte[] value) { - return CompletableFuture.supplyAsync(() -> { - try { - return _put(selfKey, value); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture put(PublicKey publicKey, byte[] value) { - return CompletableFuture.supplyAsync(() -> { - try { - return _put(publicKey, value); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - @Override - public CompletableFuture> getAtKeys(String regex) { - return getAtKeys(regex, true); - } - - @Override - public CompletableFuture> getAtKeys(String regex, boolean fetchMetadata) { - return CompletableFuture.supplyAsync(() -> { - try { - return _getAtKeys(regex, fetchMetadata); - } catch (Exception e) { - throw new CompletionException(e); - } - }); - } - - /** - * Synchronous, talks @-protocol directly to the client's Secondary server - * - * @param command in @ protocol format - * @param throwExceptionOnErrorResponse sometimes we want to inspect an error response, - * sometimes we want to just throw an exception - * @return a Secondary Response - * @throws AtException if the response from the Secondary starts with 'error:', or - * if there is any other exception - */ - @Override - public Response executeCommand(String command, boolean throwExceptionOnErrorResponse) - throws AtException, IOException { - return secondary.executeCommand(command, throwExceptionOnErrorResponse); - } - - @Override - public void close() throws IOException { - stopMonitor(); - secondary.close(); - } - - // ============================================================================================================================================ - // ============================================================================================================================================ - // ============================================================================================================================================ - - // - // Synchronous methods which do the actual work - // - private String _get(SharedKey sharedKey) throws AtException { - if (sharedKey.sharedBy().equals(atSign)) { - return _getSharedByMeWithOther(sharedKey); - } else { - return _getSharedByOtherWithMe(sharedKey); - } - } - - private String _getSharedByMeWithOther(SharedKey sharedKey) throws AtException { - String shareEncryptionKey = getEncryptionKeySharedByMe(sharedKey); - - String command = llookupCommandBuilder().key(sharedKey).operation(all).build(); - LookupResponse response = getLookupResponse(command); - - try { - return EncryptionUtil.aesDecryptFromBase64(response.data, shareEncryptionKey, response.metaData.ivNonce()); - } catch (Exception e) { - throw new AtDecryptionException("Failed to decrypt value with shared encryption key", e); - } - } - - private String _getSharedByOtherWithMe(SharedKey sharedKey) throws AtException { - String what; - String shareEncryptionKey = getEncryptionKeySharedByOther(sharedKey); - - String command = lookupCommandBuilder().key(sharedKey).operation(all).build(); - LookupResponse response = getLookupResponse(command); - - what = "decrypt value with shared encryption key"; - try { - return EncryptionUtil.aesDecryptFromBase64(response.data, shareEncryptionKey, response.metaData.ivNonce()); - } catch (Exception e) { - throw new AtDecryptionException("Failed to " + what, e); - } - } - - private String _put(SharedKey sharedKey, String value) throws AtException { - if (!this.atSign.equals(sharedKey.sharedBy())) { - throw new AtIllegalArgumentException( - "sharedBy is [" + sharedKey.sharedBy() + "] but should be this client's atSign [" + atSign + "]"); - } - String what = ""; - String cipherText; - try { - what = "fetch/create shared encryption key"; - String shareToEncryptionKey = getEncryptionKeySharedByMe(sharedKey); - - what = "encrypt value with shared encryption key"; - String iv = EncryptionUtil.generateRandomIvBase64(16); - sharedKey.updateMissingMetadata(Metadata.builder().ivNonce(iv).build()); - cipherText = EncryptionUtil.aesEncryptToBase64(value, shareToEncryptionKey, iv); - } catch (Exception e) { - throw new AtEncryptionException("Failed to " + what, e); - } - - String command = updateCommandBuilder().key(sharedKey).value(cipherText).build(); - - try { - return secondary.executeCommand(command, true).toString(); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - } - - private String _delete(SharedKey sharedKey) throws AtException { - String command = deleteCommandBuilder().key(sharedKey).build(); - try { - return secondary.executeCommand(command, true).toString(); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - } - - private String _get(SelfKey key) throws AtException { - // 1. build command - String command = llookupCommandBuilder().key(key).operation(all).build(); - - // 2. execute command - LookupResponse fetched = getLookupResponse(command); - - // 3. decrypt the value - String decryptedValue; - String encryptedValue = fetched.data; - String selfEncryptionKey = keys.getSelfEncryptKey(); - String iv = checkNotNull(fetched.metaData.ivNonce(), "ivNonce is null"); - decryptedValue = EncryptionUtil.aesDecryptFromBase64(encryptedValue, selfEncryptionKey, iv); - - // 4. update metadata. squash the fetchedMetadata with current key.metadata (fetchedMetadata has higher priority) - key.overwriteMetadata(fetched.metaData); - - return decryptedValue; - } - - private String _put(SelfKey selfKey, String value) throws AtException { - // 1. generate dataSignature - Metadata metadata = Metadata.builder() - .dataSignature(generateSignature(value)) - .ivNonce(EncryptionUtil.generateRandomIvBase64(16)) - .build(); - selfKey.updateMissingMetadata(metadata); - // 2. encrypt data with self encryption key - String cipherText = EncryptionUtil.aesEncryptToBase64(value, keys.getSelfEncryptKey(), metadata.ivNonce()); - - // 3. update secondary - String command = updateCommandBuilder().key(selfKey).value(cipherText).build(); - try { - return secondary.executeCommand(command, true).toString(); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - } - - private String _delete(SelfKey key) throws AtException { - // 1. build delete command - String command = deleteCommandBuilder().key(key).build(); - - // 2. run command - try { - return secondary.executeCommand(command, true).toString(); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - } - - private String _get(PublicKey key) throws AtException { - return _get(key, null); - } - - private String _get(PublicKey key, GetRequestOptions getRequestOptions) throws AtException { - // 1. build command - String command; - if (atSign.equals(key.sharedBy())) { - command = llookupCommandBuilder().key(key).operation(all).build(); - } else { - boolean bypassCache = getRequestOptions != null && getRequestOptions.getBypassCache(); - command = plookupCommandBuilder().key(key).bypassCache(bypassCache).operation(all).build(); - } - - // 2. run the command - LookupResponse fetched = getLookupResponse(command); - - // 4. update key object metadata - Metadata metadata = fetched.metaData.toBuilder().isCached(fetched.key.contains("cached:")).build(); - key.overwriteMetadata(metadata); - - // 5. return the AtValue - return fetched.data; - } - - private String _put(PublicKey publicKey, String value) throws AtException { - // 1. generate dataSignature - Metadata metadata = Metadata.builder().dataSignature(generateSignature(value)).build(); - publicKey.updateMissingMetadata(metadata); - - // 2. build command - String command = updateCommandBuilder().key(publicKey).value(value).build(); - - // 3. run command - try { - return secondary.executeCommand(command, true).toString(); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - } - - private String _delete(PublicKey key) throws AtException { - // 1. build command - String command = deleteCommandBuilder().key(key).build(); - - // 2. run command - try { - return secondary.executeCommand(command, true).toString(); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - } - - private byte[] _getBinary(SharedKey sharedKey) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private byte[] _getBinary(SelfKey selfKey) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private byte[] _getBinary(PublicKey publicKey) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private byte[] _getBinary(PublicKey publicKey, GetRequestOptions getRequestOptions) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private String _put(SharedKey sharedKey, byte[] value) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private String _put(SelfKey selfKey, byte[] value) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private String _put(PublicKey publicKey, byte[] value) throws AtException { - throw new RuntimeException("Not Implemented"); - } - - private List _getAtKeys(String regex, boolean fetchMetadata) throws AtException { - String scanCommand = scanCommandBuilder().regex(regex).showHidden(true).build(); - Response scanRawResponse; - try { - scanRawResponse = executeCommand(scanCommand, true); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + scanCommand, e); - } - ResponseTransformers.ScanResponseTransformer scanResponseTransformer = - new ResponseTransformers.ScanResponseTransformer(AtClientImpl::isNotManagementKey); - List rawArray = scanResponseTransformer.apply(scanRawResponse); - - List atKeys = new ArrayList<>(); - for (String atKeyRaw : rawArray) { - AtKey atKey = Keys.keyBuilder().rawKey(atKeyRaw).build(); - if (fetchMetadata) { - String llookupCommand = llookupCommandBuilder().operation(meta).rawKey(atKeyRaw).build(); - Response llookupMetaResponse; - try { - llookupMetaResponse = secondary.executeCommand(llookupCommand, true); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + llookupCommand, e); - } - try { - // atKey.metadata has priority over llookupMetaRaw.data - Metadata responseMetadata = Metadata.fromJson(llookupMetaResponse.getRawDataResponse()); - atKey.updateMissingMetadata(responseMetadata); - } catch (JsonProcessingException e) { - throw new AtResponseHandlingException("Failed to parse JSON " + llookupMetaResponse.getRawDataResponse(), e); - } - } - atKeys.add(atKey); - } - return atKeys; - } - - // ============================================================================================================================================ - // ============================================================================================================================================ - // ============================================================================================================================================ - - // - // Internal utility methods. Will move these to another class later, so that other AtClient implementations can easily use them. - // - - private LookupResponse getLookupResponse(String command) throws AtException { - Response response; - try { - response = secondary.executeCommand(command, true); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - - // 3. transform the data to a LlookupAllResponse object - LookupResponse fetched; - try { - fetched = Json.MAPPER.readValue(response.getRawDataResponse(), LookupResponse.class); - } catch (JsonProcessingException e) { - throw new AtResponseHandlingException("Failed to parse JSON " + response.getRawDataResponse(), e); - } - return fetched; - } - - private String getEncryptionKeySharedByMe(SharedKey key) throws AtException { - Secondary.Response rawResponse; - String command = llookupCommandBuilder().keyName(toSharedByMeKeyName(key.sharedWith())).sharedBy(atSign).build(); - try { - rawResponse = secondary.executeCommand(command, false); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - - if (rawResponse.isError()) { - if (rawResponse.getException() instanceof AtKeyNotFoundException) { - // No key found - so we should create one - return createSharedEncryptionKey(key); - } else { - throw rawResponse.getException(); - } - } - - // When we stored it, we encrypted it with our encryption public key; so we need to decrypt it now with our - // encryption private key - return EncryptionUtil.rsaDecryptFromBase64(rawResponse.getRawDataResponse(), keys.getEncryptPrivateKey()); - } - - private String getEncryptionKeySharedByOther(SharedKey sharedKey) throws AtException { - // Let's see if it's in our in-memory cache - String sharedSharedKeyName = AtKeyNames.toSharedWithMeKeyName(sharedKey.sharedBy(), sharedKey.sharedWith()); - - String sharedKeyValue = keys.get(sharedSharedKeyName); - if (sharedKeyValue != null) { - return sharedKeyValue; - } - - String what = ""; - - // Not in memory so now let's try to fetch from remote - e.g. if I'm @bob, lookup:shared_key@alice - String lookupCommand = lookupCommandBuilder().keyName(AtKeyNames.SHARED_KEY).sharedBy(sharedKey.sharedBy()).build(); - Response rawResponse; - try { - rawResponse = secondary.executeCommand(lookupCommand, true); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + lookupCommand, e); - } - - String sharedSharedKeyDecryptedValue; - try { - sharedSharedKeyDecryptedValue = - EncryptionUtil.rsaDecryptFromBase64(rawResponse.getRawDataResponse(), keys.getEncryptPrivateKey()); - } catch (Exception e) { - throw new AtDecryptionException("Failed to decrypt the shared_key with our encryption private key", e); - } - keys.put(sharedSharedKeyName, sharedSharedKeyDecryptedValue); - - return sharedSharedKeyDecryptedValue; - } - - private String createSharedEncryptionKey(SharedKey sharedKey) throws AtException { - // We need their public key - String theirPublicEncryptionKey = getPublicEncryptionKey(sharedKey.sharedWith()); - if (theirPublicEncryptionKey == null) { - throw new AtKeyNotFoundException(" public key " + sharedKey.sharedWith() - + " not found but service is running - maybe that AtSign has not yet been onboarded"); - } - - // Cut an AES key - String aesKey; - try { - aesKey = EncryptionUtil.generateAESKeyBase64(); - } catch (Exception e) { - throw new AtEncryptionException("Failed to generate AES key for sharing with " + sharedKey.sharedWith(), e); - } - - String what = ""; - try { - // Encrypt key with the other at-sign's publickey and save it @bob:shared_key@alice - what = "encrypt new shared key with their public key"; - String encryptedForOther = EncryptionUtil.rsaEncryptToBase64(aesKey, theirPublicEncryptionKey); - - what = "encrypt new shared key with our public key"; - // Encrypt key with our publickey and save it shared_key.bob@alice - String encryptedForUs = EncryptionUtil.rsaEncryptToBase64(aesKey, keys.getEncryptPublicKey()); - - what = "save encrypted shared key for us"; - String updateForUs = updateCommandBuilder() - .keyName(toSharedByMeKeyName(sharedKey.sharedWith())) - .sharedBy(sharedKey.sharedBy()) - .value(encryptedForUs) - .build(); - secondary.executeCommand(updateForUs, true); - - what = "save encrypted shared key for them"; - String updateForOther = updateCommandBuilder() - .keyName(AtKeyNames.SHARED_KEY) - .sharedBy(sharedKey.sharedBy()) - .sharedWith(sharedKey.sharedWith()) - .ttr(TimeUnit.HOURS.toMillis(24)) - .value(encryptedForOther) - .build(); - secondary.executeCommand(updateForOther, true); - } catch (Exception e) { - throw new AtEncryptionException("Failed to " + what, e); - } - - return aesKey; - } - - private String getPublicEncryptionKey(AtSign sharedWith) throws AtException { - Secondary.Response rawResponse; - - String command = plookupCommandBuilder().keyName(AtKeyNames.PUBLIC_ENCRYPT).sharedBy(sharedWith).build(); - try { - rawResponse = secondary.executeCommand(command, false); - } catch (IOException e) { - throw new AtSecondaryConnectException("Failed to execute " + command, e); - } - - if (rawResponse.isError()) { - if (rawResponse.getException() instanceof AtKeyNotFoundException) { - return null; - } else { - throw rawResponse.getException(); - } - } else { - return rawResponse.getRawDataResponse(); - } - } - - private String generateSignature(String value) throws AtException { - String signature; - try { - signature = EncryptionUtil.signSHA256RSA(value, keys.getEncryptPrivateKey()); - } catch (Exception e) { - throw new AtEncryptionException("Failed to sign value: " + value, e); - } - return signature; - } - - private static boolean isNotManagementKey(String s) { - return !s.matches(".+\\.__manage@.+"); - } -} diff --git a/at_client/src/main/java/org/atsign/client/api/impl/connections/AtConnectionBase.java b/at_client/src/main/java/org/atsign/client/api/impl/connections/AtConnectionBase.java deleted file mode 100644 index bacd1fa9..00000000 --- a/at_client/src/main/java/org/atsign/client/api/impl/connections/AtConnectionBase.java +++ /dev/null @@ -1,193 +0,0 @@ -package org.atsign.client.api.impl.connections; - -import java.io.IOException; -import java.io.PrintWriter; -import java.net.Socket; -import java.util.Scanner; - -import javax.net.SocketFactory; -import javax.net.ssl.SSLSocketFactory; - -import lombok.extern.slf4j.Slf4j; -import org.atsign.client.api.AtConnection; -import org.atsign.client.api.AtEvents; -import org.atsign.common.AtException; - -/** - * Core implementation of an {@link AtConnection} which uses Java NIO. This - * class contains the implementation of the socket connect / disconnect, - * writing atprotocol commands and reading atprotocol responses. - * If the socket becomes disconnected then it will be re-connected as part - * of sending the next command. - */ -@Slf4j -public abstract class AtConnectionBase implements AtConnection { - - private final String url; - - @Override - public String getUrl() { - return url; - } - - private final String host; - - @Override - public String getHost() { - return host; - } - - private final int port; - - @Override - public int getPort() { - return port; - } - - private Socket socket; - - @Override - public Socket getSocket() { - return socket; - } - - private boolean connected = false; - - @Override - public boolean isConnected() { - return connected; - } - - private final boolean autoReconnect; - - @Override - public boolean isAutoReconnect() { - return autoReconnect; - } - - protected boolean verbose; - - @Override - public boolean isVerbose() { - return verbose; - } - - @Override - public void setVerbose(boolean verbose) { - this.verbose = verbose; - } - - protected final Authenticator authenticator; - - public Authenticator getAuthenticator() { - return authenticator; - } - - protected PrintWriter socketWriter; - protected Scanner socketScanner; - - protected final AtEvents.AtEventBus eventBus; - - public AtConnectionBase(AtEvents.AtEventBus eventBus, - String url, - AtConnection.Authenticator authenticator, - boolean autoReconnect, - boolean verbose) { - this.eventBus = eventBus; - this.url = url; - this.host = url.split(":")[0]; - this.port = Integer.parseInt(url.split(":")[1]); - this.autoReconnect = autoReconnect; - this.verbose = verbose; - this.authenticator = authenticator; - } - - @Override - public synchronized void disconnect() { - if (!isConnected()) { - return; - } - connected = false; - try { - log.debug("disconnecting from {}:{}", host, port); - socket.close(); - socketScanner.close(); - socketWriter.close(); - socket.shutdownInput(); - socket.shutdownOutput(); - } catch (Exception ignore) { - } - } - - @Override - public synchronized void connect() throws IOException, AtException { - if (isConnected()) { - return; - } - SocketFactory sf = SSLSocketFactory.getDefault(); - log.debug("connecting to {}:{}...", host, port); - this.socket = sf.createSocket(host, port); - this.socketWriter = new PrintWriter(socket.getOutputStream()); - this.socketScanner = new Scanner(socket.getInputStream()); - log.debug("connected to {}:{}", host, port); - - if (authenticator != null) { - authenticator.authenticate(this); - } - connected = true; - } - - protected abstract String parseRawResponse(String rawResponse) throws IOException; - - @Override - public final synchronized String executeCommand(String command) throws IOException { - return executeCommand(command, autoReconnect, true); - } - - protected synchronized String executeCommand(String command, boolean retryOnException, boolean readTheResponse) - throws IOException { - if (socket.isClosed()) { - throw new IOException("executeCommand failed: socket is closed"); - } - try { - if (!command.endsWith("\n")) { - command = command + "\n"; - } - socketWriter.write(command); - socketWriter.flush(); - - if (verbose) { - log.info("SENT: {}", command); - } - - if (readTheResponse) { - // Responses are always terminated by newline - String rawResponse = socketScanner.nextLine(); - if (verbose) { - log.info("RCVD: {}", rawResponse); - } - - return parseRawResponse(rawResponse); - } else { - return ""; - } - } catch (Exception first) { - disconnect(); - - if (retryOnException) { - log.error("Caught exception {} : reconnecting", first.toString()); - try { - connect(); - return executeCommand(command, false, true); - } catch (Exception second) { - log.error("failed on retry", second); - throw new IOException("Failed to reconnect after original exception " + first + " : ", second); - } - } else { - connected = false; - - throw new IOException(first); - } - } - } -} diff --git a/at_client/src/main/java/org/atsign/client/api/impl/connections/AtMonitorConnection.java b/at_client/src/main/java/org/atsign/client/api/impl/connections/AtMonitorConnection.java deleted file mode 100644 index dc80d9a5..00000000 --- a/at_client/src/main/java/org/atsign/client/api/impl/connections/AtMonitorConnection.java +++ /dev/null @@ -1,258 +0,0 @@ -package org.atsign.client.api.impl.connections; - -import static org.atsign.client.api.AtEvents.AtEventBus; -import static org.atsign.client.api.AtEvents.AtEventType; -import static org.atsign.client.api.AtEvents.AtEventType.deleteNotification; -import static org.atsign.client.api.AtEvents.AtEventType.monitorException; -import static org.atsign.client.api.AtEvents.AtEventType.monitorHeartbeatAck; -import static org.atsign.client.api.AtEvents.AtEventType.sharedKeyNotification; -import static org.atsign.client.api.AtEvents.AtEventType.statsNotification; -import static org.atsign.client.api.AtEvents.AtEventType.updateNotification; - -import java.util.HashMap; - -import lombok.extern.slf4j.Slf4j; -import org.atsign.common.AtSign; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.atsign.common.Json; - -/** - * A {@link AtMonitorConnection} represents a connection to an AtServer which, - * when started will send the atprotocol monitor command and then processes - * notifications and heartbeat messages until it is stopped. If the socket disconnects - * then it will be automatically reconnected. - */ -@Slf4j -public class AtMonitorConnection extends AtSecondaryConnection implements Runnable { - - private static final ObjectMapper mapper = Json.MAPPER; - - private long _lastReceivedTime = 0; - - public long getLastReceivedTime() { - return _lastReceivedTime; - } - - public void setLastReceivedTime(long lastReceivedTime) { - this._lastReceivedTime = lastReceivedTime; - } - - private boolean running = false; - - public boolean isRunning() { - return running; - } - - private boolean _shouldBeRunning = false; - - private void setShouldBeRunning(boolean b) { - _shouldBeRunning = b; - } - - public boolean isShouldBeRunning() { - return _shouldBeRunning; - } - - public AtMonitorConnection(AtEventBus eventBus, - AtSign atSign, - String secondaryUrl, - Authenticator authenticator, - boolean verbose) { - // Note that the Monitor doesn't make use of the auto-reconnect functionality, it does its own thing - super(eventBus, atSign, secondaryUrl, authenticator, false, verbose); - startHeartbeat(); - } - - private long lastHeartbeatSentTime = System.currentTimeMillis(); - private long lastHeartbeatAckTime = System.currentTimeMillis(); - private final int heartbeatIntervalMillis = 30000; - - private void startHeartbeat() { - new Thread(() -> { - while (true) { - if (isShouldBeRunning()) { - if (!isRunning() || lastHeartbeatSentTime - lastHeartbeatAckTime >= heartbeatIntervalMillis) { - try { - // heartbeats have stopped being acked - log.error("Monitor heartbeats not being received"); - stopMonitor(); - long waitStartTime = System.currentTimeMillis(); - while (isRunning() && System.currentTimeMillis() - waitStartTime < 5000) { - // wait for monitor to stop - try { - // noinspection BusyWait - Thread.sleep(1000); - } catch (Exception ignore) { - } - } - if (isRunning()) { - log.error("Monitor thread has not stopped, but going to start another one anyway"); - } - startMonitor(); - } catch (Exception e) { - log.error("Monitor restart failed", e); - } - } else { - if (System.currentTimeMillis() - lastHeartbeatSentTime > heartbeatIntervalMillis) { - try { - executeCommand("noop:0", false, false); - lastHeartbeatSentTime = System.currentTimeMillis(); - } catch (Exception ignore) { - // Can't do anything, the heartbeat loop will take care of restarting the monitor connection - } - } - } - } - try { - // noinspection BusyWait - Thread.sleep(heartbeatIntervalMillis / 5); - } catch (Exception ignore) { - } - } - }).start(); - } - - /** - * @return true if the monitor start request has succeeded, or if the monitor is already running. - */ - @SuppressWarnings("UnusedReturnValue") - public synchronized boolean startMonitor() { - lastHeartbeatSentTime = lastHeartbeatAckTime = System.currentTimeMillis(); - - setShouldBeRunning(true); - if (!running) { - running = true; - if (!isConnected()) { - try { - connect(); - } catch (Exception e) { - log.error("startMonitor failed to connect to secondary : {}", e.getMessage()); - running = false; - return false; - } - } - new Thread(this).start(); - } - return true; - } - - public synchronized void stopMonitor() { - setShouldBeRunning(false); - lastHeartbeatSentTime = lastHeartbeatAckTime = System.currentTimeMillis(); - disconnect(); - } - - /** - * Please don't call this directly. Call startMonitor() instead, which starts the monitor in its own - * thread - */ - @SuppressWarnings("unchecked") - @Override - public void run() { - String what = ""; - // call executeCommand("monitor: