Thursday, November 22, 2018

A Standalone REST server in Java

Doing a standalone REST server, i.e., a REST server that runs outside WildFly or Tomcat or any other container is not too difficult in Java. The part that I had trouble with was figuring out the right dependencies in Maven. I will go straight to the code, as I think this is self-explanatory. I have a class that provides the "students" service, which basically keeps a list of students' names. This is basic to say the least, because I keep the list in a static property, as the server may have several threads. Real implementations would probably be backed by some sort of database seen by all threads. But for this example this suffices...

package is.project3.rest;

import java.util.ArrayList;
import java.util.List;

import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/students")
public class StudentsKeeper {
 //XXX: we have several threads...
 private static List<String> students = new ArrayList<>();
 
 @POST
 public void addStudent(String student) {
  System.out.println("Called post addStudent with parameter: " + student);
  System.out.println("Thread = " + Thread.currentThread().getName());
  students.add(student);
 }

 @GET
 @Produces(MediaType.APPLICATION_JSON)
 public List<String> getAllStudents() {
  System.out.println("Called getAllStudents");
  System.out.println("Thread = " + Thread.currentThread().getName());
  return students;
 }

 @Path("xpto")
 @GET
 @Produces(MediaType.APPLICATION_JSON)
 public String getAllStudents2() {
  System.out.println("Called getAllStudents 2");
  System.out.println("Thread = " + Thread.currentThread().getName());
  return "test";
 }

}

The server is surprisingly simple:

package is.project3.rest;


import java.net.URI;

import javax.ws.rs.core.UriBuilder;

import org.glassfish.jersey.jdkhttp.JdkHttpServerFactory;
import org.glassfish.jersey.server.ResourceConfig;

public class MyRESTServer {

 private final static int port = 9998;
 private final static String host="http://localhost/";

  public static void main(String[] args) {
  URI baseUri = UriBuilder.fromUri(host).port(port).build();
  ResourceConfig config = new ResourceConfig(StudentsKeeper.class);
  JdkHttpServerFactory.createHttpServer(baseUri, config);
 }
}


You just need to make it run and it will immediately be serving a REST web service on port 9998. More about this ahead. The troubling part was to get Maven right, considering that it had to transparently convert a List of Strings to JSON.

<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>is</groupId>
 <artifactId>project3</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>project3</name>
 <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <java.version>10</java.version>
 </properties>

 <dependencies>
  <!-- https://mvnrepository.com/artifact/junit/junit -->
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>

  <!-- https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api -->
  <dependency>
   <groupId>javax.xml.bind</groupId>
   <artifactId>jaxb-api</artifactId>
   <version>2.3.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/com.sun.xml.bind/jaxb-core -->
  <dependency>
   <groupId>com.sun.xml.bind</groupId>
   <artifactId>jaxb-core</artifactId>
   <version>2.3.0.1</version>
  </dependency>
  <dependency>
   <groupId>com.sun.xml.bind</groupId>
   <artifactId>jaxb-impl</artifactId>
   <version>2.3.0.1</version>
  </dependency>
  <dependency>
   <groupId>javax.activation</groupId>
   <artifactId>activation</artifactId>
   <version>1.1.1</version>
  </dependency>


  <!-- https://mvnrepository.com/artifact/org.glassfish.jersey.media/jersey-media-json-jackson -->
  <dependency>
   <groupId>org.glassfish.jersey.media</groupId>
   <artifactId>jersey-media-json-jackson</artifactId>
   <version>2.27</version>
  </dependency>

  <dependency>
   <groupId>org.glassfish.jersey.containers</groupId>
   <artifactId>jersey-container-grizzly2-http</artifactId>
   <version>2.27</version>
  </dependency>

  <dependency>
   <groupId>org.glassfish.jersey.containers</groupId>
   <artifactId>jersey-container-grizzly2-servlet</artifactId>
   <version>2.27</version>
  </dependency>

  <dependency>
   <groupId>org.glassfish.jersey.containers</groupId>
   <artifactId>jersey-container-jdk-http</artifactId>
   <version>2.27</version>
  </dependency>

  <dependency>
   <groupId>org.glassfish.jersey.containers</groupId>
   <artifactId>jersey-container-simple-http</artifactId>
   <version>2.27</version>
  </dependency>

  <dependency>
   <groupId>org.glassfish.jersey.containers</groupId>
   <artifactId>jersey-container-jetty-http</artifactId>
   <version>2.27</version>
  </dependency>

  <dependency>
   <groupId>org.glassfish.jersey.containers</groupId>
   <artifactId>jersey-container-jetty-servlet</artifactId>
   <version>2.27</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.glassfish.jersey.inject/jersey-hk2 -->
  <dependency>
   <groupId>org.glassfish.jersey.inject</groupId>
   <artifactId>jersey-hk2</artifactId>
   <version>2.27</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
  <dependency>
   <groupId>javax.ws.rs</groupId>
   <artifactId>javax.ws.rs-api</artifactId>
   <version>2.1.1</version>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
    <configuration>
     <release>${java.version}</release>
    </configuration>
   </plugin>
  </plugins>
 </build>

</project>


We can use curl to post new students and to get the results back. To post new students, we can do as follows on the command line with curl. If you don't have Curl in the command line you may just skip this step:

curl -d "Joao" -X POST http://localhost:9998/students
curl -d "Joana" -X POST http://localhost:9998/students

to get the list back:

curl -X GET http://localhost:9998/students/

and the result is:

["Joao","Joana"]

Now, let's build a client that is able to do a post into and get from a service as well.

package is.project3.rest;

import java.util.List;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;


public class MyRESTClient {
 public static void main(String[] args) {
        Client client = ClientBuilder.newClient();
        WebTarget webTarget = client.target("http://localhost:9998/students");

  webTarget.request().post(Entity.entity("Jose", MediaType.TEXT_PLAIN));

  Invocation.Builder invocationBuilder =  webTarget.request(MediaType.APPLICATION_JSON);
  @SuppressWarnings("unchecked")
  List<String> response = invocationBuilder.get(List.class);
  
  response.forEach(System.out::println);
 }
}

And we get our final result:

Joao
Joana
Jose

That's it!

Playing with Kafka Streams

According to its own site, "Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters". In simplified terms, Kafka is a publish-subscribe system oriented to streams processing. We can think of Kafka as a kind of crossroads, where information travels from one application to another. For example, from a large-scale distributed microservice application to a monitoring system.

In this post I assume that you have been able to start Kafka and that it is running on your localhost on port 9092, together with Zookeeper, which is available on port 2181. There are sites dedicated to running Kafka, so I will overlook that issue.

I will solve a couple of exercises:

  • Counting the occurrences of each key
  • Converting the output from Long to String
  • Reduce()
  • Materialized views
  • Windowed streams

Overview

In the end we want to have the following arrangement:



Here, a producer is writing content to the topic, a dedicated application is reading the data from the topic (possibly in parallel with other applications) and outputting results to a second topic. In this second topic, we will have a dedicated consumer waiting to get the results computed by the streams application (in the middle).

To reach this setting, we start from the right. We can resort to shell applications provided with Kafka, to run a consumer waiting on the resultstopic topic. I did it with the following command:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic resultstopic

Note that the exact way of running this command depends on how you ran Kafka. It might happen that this fails to work, as I saw in some of the students' computers. In that case you might want to create a subscriber yourself. Please resort to other tutorials on Kafka to do that.

Before we reach the streams application, we will discuss the producer, because we need to see what the producer is going to put on the kstreamstopic. Let's reuse one that comes from the Kafka tutorials. Just don't run it yet, it will be the last piece of the puzzle:

package is.kafkastreamsclass;

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {

 public static void main(String[] args) throws Exception{

  //Assign topicName to string variable
  String topicName = args[0].toString();

  // create instance for properties to access producer configs   
  Properties props = new Properties();

  //Assign localhost id
  props.put("bootstrap.servers", "localhost:9092");

  //Set acknowledgements for producer requests.      
  props.put("acks", "all");

  //If the request fails, the producer can automatically retry,
  props.put("retries", 0);

  //Specify buffer size in config
  props.put("batch.size", 16384);

  //Reduce the no of requests less than 0   
  props.put("linger.ms", 1);

  //The buffer.memory controls the total amount of memory available to the producer for buffering.   
  props.put("buffer.memory", 33554432);

  props.put("key.serializer", 
    "org.apache.kafka.common.serialization.StringSerializer");

  props.put("value.serializer", 
    "org.apache.kafka.common.serialization.LongSerializer");

  Producer<String, Long> producer = new KafkaProducer<>(props);

  for(int i = 0; i < 1000; i++)
   producer.send(new ProducerRecord<String, Long>(topicName, Integer.toString(i), (long) i));
  
  System.out.println("Message sent successfully to topic " + topicName);
  producer.close();
 }
}

This producer is somewhat dull, as it just sends a 1000 (key, value) pairs to the topic, where the key is a string and the value a long, but for now it will be enough for our experiments. I represent the output of the producer as follows:

"0" => 0
"1" => 1
"2" => 2
...
"999" => 999

Counting the occurrences of each key

Our focus here is the stream reader. Let us start by a basic one:

package is.kafkastreamsblog;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;


public class SimpleStreamsExercises {

 public static void main(String[] args) throws InterruptedException, IOException {
  String topicName = args[0].toString();
  String outtopicname = "resultstopic";

  java.util.Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exercises-application");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    
  StreamsBuilder builder = new StreamsBuilder();
  KStream<String, Long> lines = builder.stream(topicName);

  KTable<String, Long> outlines = lines.
    groupByKey().count();
  outlines.toStream().to(outtopicname);
   
  KafkaStreams streams = new KafkaStreams(builder.build(), props);
  streams.start();
  
  System.out.println("Reading stream from topic " + topicName);
  
 }
}

Don't forget the pom.xml file:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>is</groupId>
  <artifactId>kafkastreamsblog</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafkastreamsclass</name>
  <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <java.version>10</java.version>
  <maven.compiler.source>${java.version}</maven.compiler.source>
  <maven.compiler.target>${java.version}</maven.compiler.target>
 </properties>

 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>3.8.1</version>
   <scope>test</scope>
  </dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
  <dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-databind</artifactId>
   <version>2.9.5</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>2.0.0</version>
  </dependency>

 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
    <configuration>
     <release>10</release>
     <!-- <compilerArgs> <arg>add-modules</arg> <arg>javax.xml.bind</arg> 
      </compilerArgs> -->
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>

To understand this application, we need to take a look at a diagram available on the Kafka Streams site, which summarizes the relation between the library's main classes:



The groupByKey operation converts the stream to a KGroupedStream, by creating records of values indexed by the keys. In our case, since the producer will output 1000 different keys, each key will have a single record (a 0 for key 0, a 1 for key 1, a value 2 for key 2 and so on). Hence, the following count() will compute 1 for all keys and convert the KGroupedStream into a KTable, which is similar to a regular database table, having the key as the primary key. This table will have 1000 registers, each with the value 1. To see the result, we cover the KTable back to a KStream using the toStream().

To run the experiment, we need to specify the topic where the streams application will receive the data, as a command line argument. The same for the producer. A lack to do this will crash the programs. In my case, I used kstreamstopic, but you may use another topic. Just start the applications in this order:

1 - Kafka-console-consumer.sh

2 - then, SimpleStreamsExercises

3 - finally, the Producer.

Regarding the issue of the order at which applications start, Kafka keeps messages on the topic for a configurable amount of time, so we could always get the messages from the topic later, if necessary. You may also repeat the execution of the Producer as many times as you want, with only slight changes in the results (the value of the count() will keep increasing).

Converting from Long to String

But if you run this setting you may end up getting nothing on the Kafka-console-consumer, except 1000 empty lines. Why? Because we are outputting longs instead of strings and, therefore, you will be looking at ASCII character 1.  Let us change our code slightly, to ensure that we can properly see the results of our operation:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> lines = builder.stream(topicName);

KTable<String, Long> outlines = lines.groupByKey().count();

outlines.mapValues(v -> "" + v).toStream().to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));
   
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

What is new here? The mapValues(), which uses a lambda expression to transform the long value "v" into a String. However, this change alone crashes the program, because we specified the DEFAULT_VALUE_SERDES to be a Long. Hence, the attempt to write a String on the outtopicname Stream will crash the program. Therefore, we need te explicitly tell the library that we are producing the output with a String fromat (the Produced.with in the end). In other words, the final stream has a format that is different from the initial stream and from the KTable. These two had Long values, while the final stream has a String value.

Now, you should see this in the Kafka-console-consumer shell:

3
3
3
3
3
...

or whatever number of times you ran the whole application, instead of 3 (e.g., I'm actually seeing a thousand 10s).

This is still not very handy, because we cannot see the keys. To see them we may change the lambda expression in the mapValues to become:


mapValues((k, v) -> k + " => " + v)

i.e., it receives the key-value pair and replaces the value (because the function is "mapValues") by the string with the key, the arrow and the value, which is much nicer (don't worry about the 12 your case should be different, perhaps smaller):

...
989 => 12
990 => 12
991 => 12
992 => 12
993 => 12
994 => 12
995 => 12
996 => 12
997 => 12
998 => 12
999 => 12

Reduce()

What about summing all the values of a given key?

KTable<String, Long> outlines = lines.
    groupByKey().
    reduce((oldval, newval) -> oldval + newval);
outlines.mapValues((k, v) -> k + " => " + v).toStream().to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));

Swap the count() by a reduce(). The lambda expression in the reduce keeps accumulating the new values that show up for the key. The reduce stores the result as it is stateful (mind the legend in the figure before, regarding the reduce()). For example, you might see the following output in Kafka-console-consumer shell:

985 => 3940
986 => 3944
987 => 3948
988 => 3952
989 => 3956
990 => 3960
991 => 3964
992 => 3968
993 => 3972
994 => 3976
995 => 3980
996 => 3984
997 => 3988
998 => 3992
999 => 3996

Materialized views

Now, the case for a materialized view. Materialized views actually allow us to query the tables, either directly by reaching for the value of a key, or in ranges, as show in this case:


package is.kafkastreamsblog;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;


public class SimpleStreamsExercises {

 private static final String tablename = "exercises";

 public static void main(String[] args) throws InterruptedException, IOException {
  String topicName = args[0].toString();
  String outtopicname = "resultstopic";

  java.util.Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exercises-application");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    
  StreamsBuilder builder = new StreamsBuilder();
  KStream<String, Long> lines = builder.stream(topicName);

  KTable<String, Long> countlines = lines.
    groupByKey().
    reduce((oldval, newval) -> oldval + newval, Materialized.as(tablename));
  countlines.mapValues(v -> "" + v).toStream().to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));


  KafkaStreams streams = new KafkaStreams(builder.build(), props);
  streams.start();
  
  
  System.out.println("Press enter when ready...");
  System.in.read();
  while (true) {
   ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(tablename, QueryableStoreTypes.keyValueStore());
   System.out.println("count for 355:" + keyValueStore.get("355"));
   System.out.println();
   // Get the values for a range of keys available in this application instance
   KeyValueIterator<String, Long> range = keyValueStore.range("880", "980");
   while (range.hasNext()) {
     KeyValue<String, Long> next = range.next();
     System.out.println("count for " + next.key + ": " + next.value);
   }
   range.close();
   Thread.sleep(30000);
  }  
 }
}

After sending a few more messages with the Producer, and pressing Enter, we get this result on the streams application:

Press enter when ready...

count for 355:355

count for 880: 880
count for 881: 881
count for 882: 882
count for 883: 883
count for 884: 884
count for 885: 885
count for 886: 886
count for 887: 887
count for 888: 888
count for 889: 889
count for 89: 89
count for 890: 890
count for 891: 891
count for 892: 892
count for 893: 893
count for 894: 894
count for 895: 895
count for 896: 896
count for 897: 897
count for 898: 898
count for 899: 899
count for 9: 9
count for 90: 90
count for 900: 900
count for 901: 901
count for 902: 902
count for 903: 903
count for 904: 904
count for 905: 905
count for 906: 906
count for 907: 907
count for 908: 908
count for 909: 909
count for 91: 91
count for 910: 910
count for 911: 911
count for 912: 912
count for 913: 913
count for 914: 914
count for 915: 915
count for 916: 916
count for 917: 917
count for 918: 918
count for 919: 919
count for 92: 92
count for 920: 920
count for 921: 921
count for 922: 922
count for 923: 923
count for 924: 924
count for 925: 925
count for 926: 926
count for 927: 927
count for 928: 928
count for 929: 929
count for 93: 93
count for 930: 930
count for 931: 931
count for 932: 932
count for 933: 933
count for 934: 934
count for 935: 935
count for 936: 936
count for 937: 937
count for 938: 938
count for 939: 939
count for 94: 94
count for 940: 940
count for 941: 941
count for 942: 942
count for 943: 943
count for 944: 944
count for 945: 945
count for 946: 946
count for 947: 947
count for 948: 948
count for 949: 949
count for 95: 95
count for 950: 950
count for 951: 951
count for 952: 952
count for 953: 953
count for 954: 954
count for 955: 955
count for 956: 956
count for 957: 957
count for 958: 958
count for 959: 959
count for 96: 96
count for 960: 960
count for 961: 961
count for 962: 962
count for 963: 963
count for 964: 964
count for 965: 965
count for 966: 966
count for 967: 967
count for 968: 968
count for 969: 969
count for 97: 97
count for 970: 970
count for 971: 971
count for 972: 972
count for 973: 973
count for 974: 974
count for 975: 975
count for 976: 976
count for 977: 977
count for 978: 978
count for 979: 979
count for 98: 98
count for 980: 980

This seems awkward, because the 98 shows up between the 979 and the 980, but keep in mind that the keys are strings.

Windowed streams

What if we want to restrict the results to the last x minutes, being x variable? In this case we should do as follows:
package is.kafkastreamsblog;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;


public class SimpleStreamsExercises {

 public static void main(String[] args) throws InterruptedException, IOException {
  String topicName = args[0].toString();
  String outtopicname = "resultstopic";

  java.util.Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exercises-application");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    
  StreamsBuilder builder = new StreamsBuilder();
  KStream<String, Long> lines = builder.stream(topicName);

  KTable<Windowed<String>, Long> addvalues = lines.
    groupByKey().
    windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1))).
    reduce((aggval, newval) -> aggval + newval, Materialized.as("lixo"));
  addvalues.toStream((wk, v) -> wk.key()).map((k, v) -> new KeyValue<>(k, "" + k + "-->" + v)).to(outtopicname, Produced.with(Serdes.String(), Serdes.String()));

  KafkaStreams streams = new KafkaStreams(builder.build(), props);
  streams.start();
  
  System.out.println("Reading stream from topic " + topicName);
  
 }
}


We are basically applying a window of 1 minute to the results, and therefore we may get:

988 => 988
989 => 989
990 => 990
991 => 991
992 => 992
993 => 993
994 => 994
995 => 995
996 => 996
997 => 997
998 => 998
999 => 999

I.e., the sum of the values of the last minute. You may play with this value and change it for 10 minutes for example. You will notice that the results might differ (even without the need to send new messages with the producer):

988 => 2964
989 => 2967
990 => 2970
991 => 2973
992 => 2976
993 => 2979
994 => 2982
995 => 2985
996 => 2988
997 => 2991
998 => 2994
999 => 2997

In fact several variants of windows exist, but I will not cover them here.

Saturday, October 6, 2018

Creating a MySQL Datasource in WildFly 14

To create a MySQL Datasource you first need te configure the MySQL driver, which doesn't come configured by default. To do this, I did the following. Remember to have the MySQL server running. At first don't start WildFly.

Firstly, follow this link to install the MySQL Driver in WildFly 14.

Secondly, you need to start WildFly 14 and access the localhost at port 9990: http://localhost:9990/.

Note that this will not work if you don't have a management user. If this URL gives you a WildFly text page with no management options, you most likely need to run the add-user.sh or add-user.bat in the bin directory of the WildFly installation, before proceeding.

Once you do that, you need to select the following option:



Then, a few dialogs will pop up, but in the end you need to have the following data. Please keep in mind that at some point you need to insert the username and the password you use to access the MySQL. You should also note the name of the database: proj2 in this case. For each different database you will need a different datasource. In the end you will have the opportunity to test the connection to the database. If it fails, check the output of WildFly.

Please mind the java:/MySqlDS JNDI name. This is the name that you must give in the persistence.xml file in the Java Persistence API project deployed in the same WildFly:

<jta-data-source>java:/MySqlDS</jta-data-source>



Saturday, April 21, 2018

The Use Cases of the Cloud

Intro

A few days ago, I engaged in a discussion on whether we should use cloud computing to deploy the software we will develop in the scope of a research project. As a result of this discussion, I felt the need to clearly identify the cases where the cloud is useful. Please note that the discussion that follows is sort of one-sided: I don't discuss the cases where the cloud is not useful, although these are to some extent implicit, if they do not fit in the favorable cases.

I would like to thank my colleague Paulo Rupino for providing me useful comments on this discussion.


Discussion


An important discussion that often takes place is when to run software on the cloud, instead of doing it on premises. As I see this, we have basically three situations where going to the cloud might be advantageous, depending on the specific cases:

1) The cloud has technology that is not available on premises. For example, consider two clusters of application servers in different regions served by a DNS + local load balancer scheme. Achieving the same level of availability and response times using privately owned hardware and software would certainly be too complicated for the standard user. Big data provides us another example, as Amazon, Google or Microsoft might leverage their huge volumes of data, to provide machine learning services that are simply not available on premises. This could be the case of Amazon AWS Rekognition, for example.

Some other cases are less clear, as solutions may actually exist on premises, but one does not know if they actually perform on the same level as cloud-based solutions. But, even when performance is comparable (or even better on premises), this takes us to the second point.

2) It is much easier to deploy on the cloud. Deploying on premises and maintaining a running solution on premises may be too complicated. A standard scheme with a cluster of application servers in a single region serves as example here. Maybe we could find the appropriate hardware, application server, database, fault-tolerant load balancer and so on, but a scenario like this might be set up and running in minutes in Amazon, with Elastic Beanstalk, for example, but would certainly take a long time to deploy, if we were going to do this privately. This may motivate companies to deploy on the cloud, to reduce their workforce, thus leading us to the next point.

3) The economics favors the cloud. Then, even if you dare to install the hardware and software on premises, this might not be worth it. Consider the following cases: 
A site with a very low demand year-round that has some short-lived peaks, e.g., due to some sort of periodic event. In this case over-provisioning for the events only would be too costly.
AWS Step and Lambda functions provide us the other case: if most of the time our application is not running, we may want to pay per use instead of paying per time.
The company may desire to exchange capex by opex.