May 18, 2017

ApacheCon / Apache BigData - Day 2

Here is my conference coverage for ApacheCon and Apache BigData NA 2017 day 2. See day 1 coverage here.

Apache Ignite
Like last year in Vancouver Apache Ignite is again a big thing. It's really an amazing piece of technology. Here's the feature puzzle of Apache Ignite:
At the conference the following Ignite topics were covered for the lately released version 2.0:

SQL Grid
Ignite supports ANSI SQL 99 compliant access to the data within a memory grid. It supports even the tricky things like (distributed) joins and groupings and full-text search within the data model and geo-spatial qeries. The data is always consistent and transactions are ACID. Even if Ignite acts as an read-through/write-through cache for a relational database. This is a very interesting use case as this allows Ignite to act as an caching SQL proxy in front of an relational database. Ignite SQL can be accessed by an own JDBC and ODBC driver as well as by the Ignite SQL API. The relational data model within Ignite can be described and modified with SQL DDL and DMLs as well as by code annotations and XML configuration. The relational data model can also be imported from relational databases. Indexes are stored in-memory (off-heap) as B+ trees.

Streaming
With data streamers you can import data into an Ignite Cluster as stream with automatic partitioning support. Prebuilt data streamers for Kafka, RocketMQ, sockets, JMS, MQTT and others are available. The processing side are continuous SQL queries on sliding windows.

Web Console
There is a web console for Apache Ignite available for query execution, result visualization and monitoring. It also provides a schema import wizard from relational databases. 

File System
Ignite provides an in-memory file system which implements the Hadoop FileSystem API. So it can be used as a HDFS or Alluxio replacement for {Hadoop, Spark, Flink}. In this scenario it can also act as an caching layer between {Hadoop, Spark, Flink} and real (and persistent) HDFS. 

Ignite 2.1
Ignite 2.1 will be released within the next months. The big new thing will be an own high-performance persistent storage implementation to be able to provide durable scenarios without relying on external persistent storage solutions.

Btw.: Ignite claims to be way faster than Hazelcast and an Ignite book has just being completed.

Presto
When it comes to interactive analysis of big data Facebook's Presto seems to be the jack of all trades. It supports full ANSI-SQL (including joins) has its own JDBC driver and Tableau web connector and can connect to various data sources like files within HDFS in formats like Parquet and ORC as well as other persistent storages like Cassandra, Hive, PostgreSQL, and Redis. Presto can be enhanced by UDFs and provides enterprise-grade features like Kerberos and LDAP authentication and secured cluster-internal communication. Presto is maintained by a solid community and has a broad user base. There's also a nice web interface for Presto available from Airbnb. Beside Facebook also Teradata contributes to Presto with about 20 developers and provides an own Presto distribution with enterprise support available.

IoT
Apache is very busy in providing an open source IoT stack on top of mynewt, an real time operating system (RTOS) for low-level devices (Cortex M0-M4, MIPS, RISC-V) with included device management features like build and package mangement, remote firmware upgrade, secure bootloader and signed images.


Incubating Edget provides analytics capabilities at the edge from the cloud to the IoT fog.

May 17, 2017

ApacheCon / Apache BigData - Day 1

The Apache Foundation event management team is really excellent in choosing venues for their conferences. After Vancouver, BC last year this year's ApacheCon and Apache BigData takes place in beautiful Miami, FL. Following my conference coverage of day 1. See day 2 coverage here.

Notebooks
Notebooks for data analysis are very en vogue. Apache Zeppelin and Jupyter are the super heroes in that area. Pixiedust is a nice extension to Jupyter providing easy-to-use data visualization primitives. Helium is a new plugin system and package repository for Zeppelin providing various ready-to-use Zeppelin extensions (visualizations, interpreters, spell).

Cloud
Basically no surprise but a little bit surprisingly intensive is the promotion of Apache CloudStack as open source IaaS platform and competitor to OpenStack. I thought this war is over and OpenStack is the clear winner - but Apache doesn't want to capitulate.

Flink and Spark ... and Beam
Flink seems to be at eye level with Spark. Each time Spark is mentioned also Flink is mentioned. Apache Beam is also very good covered at the conference providing an abstraction layer atop of both. But concerning Apache Beam I'm very suspicious of abstraction frameworks of abstraction frameworks. Beam is also an abstraction for Google Cloud Dataflow. So it maybe also exists for Google having a "no vendor lock-in" argument. Btw.: Google is one of the most contributing companies to Beam.

Messaging
There are two new players around in the field of messaging systems. In the range between Kafka and classical messaging systems like ActiveMQ and RabbitMQ RocketMQ is just in the middle. RocketMQ is an open source contribution of Alibaba - one of the largest web-scale companies on earth. You can find a nice comparison chart of RocketMQ with Kafka and ActiveMQ here. RocketMQ provides more guarantees compared to Kafka like strict ordering but at a price: It's based on a master/slave architecture so it's not as scalable like Kafka. But compared with ActiveMQ and RabbitMQ it has a significant higher throughput through leveraging the pull/distributed log principle of Kafka. As RocketMQ also provides a JMS interface it could be on a real sweet spot between Kafka and ActiveMQ/RabbitMQ. Apache DistributedLog is not a full fledged messaging solution but a building block therefor. It provides a distributed log implementation - f.e. Kafka is also based on a distributed log. Allegro open-sourced Hermes, a message broken on top of Kafka extending Kafka with REST pub/consumer interfaces, message tracing and monitoring, and guaranteed message delivery at a sub-millisecond cost atop of Kafka.

Hardware Diversification
Spark and others are prepared to support diverse Hardware like GPUs, TPUs and non-volatile / durable RAM ... also with a talk on QAware research project "how to leverage the GPU on Spark". There is also a native lib from Intel (Math Kernel Library) which claims to speed-up ML use cases on Spark by 9x at no additional cost.

Dataservices
Dataservices is a new way how to process data and an alternative to Spark and Flink if you want to implement and run data processing applications atop of a microservice platform. I did a talk on how to implement dataservices with Spring Cloud Data Flow.
Others proposed to use a serverless framework like OpenWhisk to implement dataservices.

Jan 23, 2017

Setting up a distributed Ehcache with Mule ESB Community Edition

Setting up a distributed Ehcache on Mule ESB Community Edition is in fact quite simple and can be achieved in a few steps. After creating an Ehcache configuration, we set up a cache manager managed by Spring. We then use the previously defined caches in our Mule configuration together with a cache key extractor in a custom caching interceptor.

Related

If you have Mule Enterprise Edition, check out the Caching Scope which allows caching of predefined blocks inside flows. Ehcache also provides distributed caching with Terracotta and BigMemory Max.

Prerequisites

In our example we are using Mule 3.8.0 together with Ehcache 2.6.3 and Spring 4.1.6 inside a Glassfish 4 server. Mule is configured using XML configuration files.

Setting up the distributed Ehcache

The Ehcache is configured using ehcache.xml configuration files which consist at least of a list of cache configurations. For the distributed cache, we also need a peer provider, a peer listener and an event listener for each cache.

  • peer provider: locates other peers and manages a list of peers belonging to the distributed cache
  • peer listener: listens for incoming cache changes
  • cache event listener: listens for local cache changes and distributes changes to other peers

First, we set up the peer provider which locates other peers in the network and manages a list of peers which belong to the distributed cache:

<cacheManagerPeerProviderFactory class="net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory" properties="peerDiscovery=automatic, multicastGroupAddress=224.0.0.1, multicastGroupPort=22401"/>

The peer discovery can either be done in automatic mode using multicast (as listed above) or in manual mode explicitly specifying the remote peer addresses. The latter approach is usually safer for company networks or data centers, but requires a lot of lines of configuration when using more than just a few caches and server instances. In automatic mode, the peer provider sends multicast messages to all server instances in the multicast group and tells them about its caches and the port on which the peer listener (see below) listens for incoming cache changes.

Together with the peer provider, we need a peer listener which listens for incoming cache changes:

<cacheManagerPeerListenerFactory class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory" properties="socketTimeoutMillis=2000"/>

If we do not specify a port as in the example above, Ehcache automatically chooses a high numbered port which is still unused. For company networks or data centers, you might want to specify the port explicitly. When using automatic peer discovery, the information about which server instance uses which port is distributed by the peer provider over multicast messages. In case of a manual peer discovery, the addresses and ports have already been stated explicitly in the peer provider configuration.

Finally each cache needs a cache event listener (defined inside its cache tag) which distributes cache changes such as new cache entries to the remote peers.

<cacheEventListenerFactory class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>

Also see the Ehcache Replication Guide which is still available on the Ehcache.org website.

Configuring the cache manager

Next, we configure an Ehcache manager in our Spring configuration file. We will use the cache manager later to retrieve the caches and use them in our caching interceptor.

<bean id="ehcacheManager" class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
    <property name="configLocation" value="classpath:/ehcache.xml"/>
</bean>
<!-- optional: -->
<bean id="ehcacheCacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
    <property name="cacheManager" ref="ehcacheManager"/>
</bean>
<cache:advice id="ehcacheAdvice" cache-manager="ehcacheCacheManager"/>

The cache manager uses the previously defined ehcache.xml files. This is usually a good place to define separate configuration files for your environments e.g. using ${ } property expansion. For example you might want to disable the distributed cache when testing locally or define different addresses or ports for your production environment. Note that the configLocation is a Spring resource, so if you want to point it to a file not in the classpath, use the file: prefix, e.g. file:/path/to/ehcache.xml.

If we also want to use our caches elsewhere with Spring, it is a good idea to define a Spring cache manager (in the example above called ehcacheCacheManager) and an appropriate cache advice.

An interface for cache key extractors

In order to provide a cache key to our caching interceptor for every service, we implement a cache key extractor defined by a simple extractor interface.

public interface CacheKeyExtractor {
    Object extractKeyFrom(MuleEvent event);
}

For each service, we implement a concrete cache key extractor. An extractor could e.g. analyse the payload of the request, parse it and extract the relevant information that is a viable cache key. Since we pass the MuleEvent, we also have access to inbound, outbound and session properties set by Mule or could retrieve other information from our Spring context.

In order to be able to access our cache key extractor implementations in the Mule configuration files, we define them as Spring components (e.g. @Component("fooCacheKeyExtractor")) and give them a unique name for simple usage.

Implementing the caching interceptor

The last component needed for a working cache is the caching interceptor. It is implemented as a custom Mule interceptor. On a cache hit, further execution of the flow is stopped and the cached payload is returned. On a cache miss, the flow continues and the result of the execution is put into the cache. Logging messages and documentation are stripped from the following example code.

@Component
public class CachingInterceptor implements Interceptor {
    private static final String HTTP_STATUS_OK = "200";
    private Ehcache cache = null;
    private MessageProcessor next = null;
    private CacheKeyExtractor extractor = null;

    @Override
    public MuleEvent process(MuleEvent event) throws MuleException {
        Object cacheKey = extractor.extractKeyFrom(event);
        if (cacheKey == null) {
            return next.process(event);
        }
        Element cachedElement = cache.get(cacheKey);
        if (cachedElement == null) {
            // cache miss
            return updateCache(cacheKey, event);
        } else {
            // cache hit
            return lookupCache(cachedElement, event);
        }
    }

    private MuleEvent updateCache(Object cacheKey, MuleEvent event) throws MuleException {
        // invoke the intercepted processor
        MuleEvent result = next.process(event);
        String status = result.getMessage().getInboundProperty("http.status");
        if (!HTTP_STATUS_OK.equals(status)) {
            return result;
        }
        // cache the payload of the intercepted processor
        try {
            byte[] payload = result.getMessage().getPayloadAsBytes();
            if (payload != null) {
                cache.put(new Element(cacheKey, payload));
            }
        } catch (IOException e) {
        } catch (Exception e) {
        }
        return result;
    }

    private MuleEvent lookupCache(Element cachedElement, MuleEvent event) throws MuleException {
        // extract the cached payload
        try {
            Object payload = cachedElement.getObjectValue();
            MuleMessage cachedMessage = new DefaultMuleMessage(payload, event.getMessage(), event.getMuleContext());
            return new DefaultMuleEvent(cachedMessage, event);
        } catch (IOException e) {
            cachedElement.setTimeToLive(0);
            return next.process(event);
        }
    }

    @Override
    public void setListener(MessageProcessor messageProcessor) {
        next = messageProcessor;
    }
    
    public void setCache(Ehcache cache) { this.cache = cache; }
    public void setCacheKeyExtractor(CacheKeyExtractor extractor) {
        this.extractor = extractor;
    }
}

The caching interceptor can now be used in our Mule flows.

Configuring Mule

The Mule configuration is now simple. We first need access to our caches so we can insert them into the caching interceptor. The Spring EhCacheFactoryBean already provides the extraction of caches from our previsouly defined cache manager.

<beans:beans>
    <beans:bean id="fooServiceCache" class="org.springframework.cache.ehcache.EhCacheFactoryBean">
        <beans:property name="cacheName" value="fooServiceCache"/>
        <beans:property name="cacheManager" ref="ehcacheManager"/>
    </beans:bean>

    <beans:bean id="barServiceCache" class="org.springframework.cache.ehcache.EhCacheFactoryBean">
        <beans:property name="cacheName" value="barServiceCache"/>
        <beans:property name="cacheManager" ref="ehcacheManager"/>
    </beans:bean>
</beans:beans>

In our flows, we can now insert the custom caching interceptor. The interceptor is configured with the cache to be used (the name must match the one in the ehcache.xml file) and a cache key extractor that knows how to extract a cache key for this specific service. Since we defined the extractor as a named Spring bean, we can now easily inject it here. On a side note, a more sophisticated implementation of the caching interceptor could also e.g. find the extractor by some name magic using Spring. The message processor listener, which is also needed by the caching interceptor, is automatically set by Mule.

<flow name="fooService">
    <inbound-endpoint ref="foo-service-inbound-endpoint"/>
    <!-- ... -->
    <custom-interceptor class="de.qaware.caching.CachingInterceptor">
        <beans:property name="cache" ref="fooServiceCache"/>
        <beans:property name="cacheKeyExtractor" ref="fooServiceCacheKeyExtractor"/>
    </custom-interceptor>
    <!-- ... -->
    <outbound-endpoint ref="foo-service-outbound-endpoint"/>
</flow>

And that’s it. Calls to our foo service are now cached and distributed to our other nodes. Subsequent calls of our foo service should now be answered faster.

Troubleshooting

If you have problems with the Ehcache configuration, first make sure that the correct ehcache.xml file is loaded. Spring and Ehcache will switch to a default failsafe configuration in case of errors which will lead you on a wrong trail. Also have a look at the Ehcache log message at debug log level. Ehcache should print a lot of peer discovery messages for automatic mode and give you a hint on problems with your configuration. In case of problems with Mule, also have a look at the log messages in debug mode, they are quite verbose.