Spring cloud & AWS Elasticache

Spring Cloud AWS (http://cloud.spring.io/spring-cloud-aws/) provides a nice caching abstraction allowing objects to be cached in AWS ElastiCache (https://aws.amazon.com/elasticache/) via the @Cacheable annotation. There is some documentation on this but limited real world examples. For example how do you run locally or in tests when you can’t connect to ElastiCache?

Here is how we did it at Beam…

We setup the Spring AWS caching largely as per the standard instructions – http://cloud.spring.io/spring-cloud-aws/spring-cloud-aws.html#_caching This is what is in the applicationContext.xml:

 <cache:annotation-driven proxy-target-class="true"/>
 
 <aws-cache:cache-manager id="cacheManager">
 <aws-cache:cache-ref ref="beamCache"/>
 </aws-cache:cache-manager>

 <aws-context:context-credentials>
 <aws-context:instance-profile-credentials/>
 <aws-context:simple-credentials access-key="${aws.key " secret-key="${aws.secret "/>
 </aws-context:context-credentials> 

The major departure here is that we use a cache-ref to a bean called “beamCache”. The reasoning for this is so we can have dynamic runtime config between an embedded memcached server or the elasticache server.

The beamCache bean is configured like so:

package com.beam.spring;

import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.elasticache.AmazonElastiCacheClient;
import com.google.common.collect.Lists;
import de.flapdoodle.embed.memcached.Command;
import de.flapdoodle.embed.memcached.MemcachedExecutable;
import de.flapdoodle.embed.memcached.MemcachedProcess;
import de.flapdoodle.embed.memcached.MemcachedStarter;
import de.flapdoodle.embed.memcached.config.ArtifactStoreBuilder;
import de.flapdoodle.embed.memcached.config.DownloadConfigBuilder;
import de.flapdoodle.embed.memcached.config.MemcachedConfig;
import de.flapdoodle.embed.memcached.config.RuntimeConfigBuilder;
import de.flapdoodle.embed.memcached.distribution.Version;
import de.flapdoodle.embed.process.config.IRuntimeConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.Cache;
import org.springframework.cloud.aws.cache.CacheFactory;
import org.springframework.cloud.aws.cache.ElastiCacheFactoryBean;
import org.springframework.cloud.aws.cache.memcached.MemcachedCacheFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import java.io.IOException;
import java.util.List;

@Configuration
public class CacheFactoryBean {
 private static final Log LOG = LogFactory.getLog(CacheFactoryBean.class);

 private static final int EMBEDDED_PORT = 11211;

 @Value("${aws.elasticache.cluster.id}")
 String cacheClusterId;
 @Value("${aws.region}")
 String regionName;
 @Value("${aws.elasticache.expiry.seconds}")
 int cacheExpirySeconds;

 @Bean(name = "beamCache")
 @Lazy
 Cache getCache() throws Exception {
 final MemcachedCacheFactory memcachedCacheFactory = new MemcachedCacheFactory();
 memcachedCacheFactory.setExpiryTime(cacheExpirySeconds);

 if (cacheClusterId.equals("localhost")) {
 LOG.info("Starting embedded memcached server on port " + EMBEDDED_PORT);
 embeddedMemcachedProcess(embeddedMemcachedExe());
 // Connect a memcache client to the embedded server
 LOG.info("Connecting to local memcached");
 return memcachedCacheFactory.createCache(cacheClusterId, "localhost", EMBEDDED_PORT);
 } else {
 LOG.info("Connecting to ElastiCache cluster: " + regionName + " :: " + cacheClusterId);

 // We are only interested in memcached
 List cacheFactories = Lists.newArrayList();
 cacheFactories.add(memcachedCacheFactory);

 // Setup the aws elasticache client
 AmazonElastiCacheClient amazonElastiCacheClient = new AmazonElastiCacheClient();
 amazonElastiCacheClient.setRegion(RegionUtils.getRegion(regionName));

 // Use the factory to produce the cache
 ElastiCacheFactoryBean elastiCacheFactoryBean = new ElastiCacheFactoryBean(amazonElastiCacheClient, cacheClusterId, cacheFactories);
 elastiCacheFactoryBean.afterPropertiesSet();
 return elastiCacheFactoryBean.getObject();
 }
 }

 @Bean(destroyMethod = "stop")
 @Lazy
 MemcachedExecutable embeddedMemcachedExe() {
 final Command command = Command.MemcacheD;
 // Hardcode the download url. Using the value from: https://github.com/flapdoodle-oss/de.flapdoodle.embed.memcached/blob/master/server.properties
 IRuntimeConfig runtimeConfig = new RuntimeConfigBuilder()
 .defaults(command)
 .artifactStore(
 new ArtifactStoreBuilder()
 .defaults(command)
 .download(new DownloadConfigBuilder()
 .defaultsForCommand(command)
 .downloadPath("http://heli0s.darktech.org/memcached/")))
 .build();

 MemcachedStarter runtime = MemcachedStarter.getInstance(runtimeConfig);
 return runtime.prepare(new MemcachedConfig(Version.Main.V1_4, EMBEDDED_PORT));
 }

  @Bean(destroyMethod = "stop")
  @Lazy
  MemcachedProcess embeddedMemcachedProcess(MemcachedExecutable embeddedMemcachedExe) throws IOException {
    return embeddedMemcachedExe.start();
  }

}

The @Lazy annotation ensures that the embedded beans and the cache itself are only instantiated in those environments where they are required.

This is dependant on a few properties which are the only things to change between local & production and everything in between. If the cluster id is set to “localhost” then a local embedded memcached server is automatically started (and stopped) allowing use in local and test environments. For production, stage, etc environments the appropriate Elasticache cluster can be specified, for example:

Local:

aws.elasticache.cluster.id=localhost
aws.elasticache.expiry.seconds=3600

Production:

aws.elasticache.cluster.id=beam-service-cache
aws.elasticache.expiry.seconds=86400

For the embedded memcache server we use the flapdoodle embedded memcache server, see: https://github.com/flapdoodle-oss/de.flapdoodle.embed.memcached

The next issue is the need for runtime configuration of the cache name as we can’t use constants in the @Cacheable annotation with this config. We implement a CacheResolver that points everything at the “beamCache” bean configured in the factory outlined above. Switching to Scala now:

/**
 * Cache resolver that points everything at the same cache, which is typically the elasticache cluster
 */
@Component
class FixedCacheResolver @Autowired()(@Qualifier("beamCache") cache: Cache) extends CacheResolver {

 override def resolveCaches(context: CacheOperationInvocationContext[_]): util.Collection[_ <: Cache] =
 Sets.newHashSet(cache)

}

We can then use this annotation to enable caching on methods in a service or cache wrapper component:

@CacheConfig(cacheNames = Array("beamCache"), cacheResolver = "fixedCacheResolver", keyGenerator = "beamKeyGenerator")

With methods along the lines of this to do the caching:

@Cacheable
 override def findStoreGroupsForStore(storeId: Long): Set[StoreGroup] = {
 log.debug(s"Cache miss: findStoreGroupsForStore($storeId)")
	... expensive evaluation ...
 }
 
 @CacheEvict
 override def evictStore(storeId: Long) {
 // no-op - taken care of by the annotation and the magic of BeamKeyGenerator
 log.debug(s"Evicted Store($storeId) from the cache")
 }

And the magic of “beamKeyGenerator” is to generate consistent cache keys for different methods and parameters and also take the environment into account so we can have for example the uat and stage environments pointing to the same cache cluster.

@Component
class BeamKeyGenerator @Autowired()(@Value("${beam.environment}") environment: String) extends KeyGenerator {

 private val log = LoggerFactory.getLogger(classOf[BeamKeyGenerator])

 private val storeGroupsForStore = "storeGroupsForStore"

 override def generate(target: scala.Any, method: Method, params: AnyRef*): AnyRef = {
 val m = method.getName match {
 case "findStoreGroupsForStore" => storeGroupsForStore + ":"  + params.head
 case "evictStore" => storeGroupsForStore + ":" + params.head
 case _ => method.getName + ":" + params.mkString(",")
 }

 // We prefix with the environment so we can run with the same uber cache in multiple environments
 val key = environment + ":" + m
 log.debug(s"Generated cache key: $key for ${method.getName}")
 key
 }

}