PutObject o Object Storage from Spark App with AWS-SDK

If you are trying to design the analytics platform or a streaming data pipeline, then considering todays technological move, it is highly probable that you have decide to start with Apache Spark. Because Spark is empowered to support multi-lingual development including Scala as its native language, Java and Python. It also includes a powerful machine learning library that can be incorporated into your platform if you have future plans to turn your platform into real time machine learning platform.

A typical spark application requires a spark-session, dataframe, RDDs, datasets, sql queries and then storing the query results into some data ware house or data lake.

We are going to discuss the scenario when you will have your own S3 service but not the public cloud object storage services like AWS or GCP. In this case resolving the ip address mapped to your s3-domain name is the crucial part.

A sample spark code in scala will look like this -

val spark = SparkSession 
.builder()
.appName(“SparkScalaApp”)
.master(“local”)
.getOrCreate()
val df = spark.read.csv(“/Users/home/Dataset/webLog.csv”) df.createOrReplaceTempView(“view”) val result = spark.sql( “””SELECT Staus, COUNT(*) as Count
FROM view
WHERE Staus is 200
GROUP BY Staus”””.stripMargin)
.write
.saveAsTable(“logtable”)
result.show()spark.stop()

If you are using Java, the same code will look like this -

SparkSession spark = SparkSession
.builder()
.appName(“SparkJavaApp”)
.master(“local”)
.getOrCreate();
// Create java spark context to run the job
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Create a spark SQL context
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(jsc);

The application start by creating a spark session, reading a file into dataframe and then performing a simple count query on success code 200. The log file here is webLog.csv as you can see.

result.show() is nothing but a dataset<row> consisting of two string literals Staus and count generated from log file. We do not just calculate the results but want to store them into a placeholder like S3 or Cassandra.

We are familiar with AWS SDK — which you can import into scala build tools or maven packages for your projects and you have those APIs available. But unlike S3 service by AWS, it is not as straightforward to write to something else using S3 API calls. Why? Because you need to have good address resolution mechanism set up on your machine and also using AWS S3 makes sure that you are connecting the domain name like bucket-name.s3.amazonaws.com.

So how do handle this situation? Here is the quickest solution -

You need to create a s3 client but this time with your own s3 url and authentication keys.

AWSCredentials credentials = new BasicAWSCredentials(“your-access-key”, “your-secret-key”);// create s3 client
AmazonS3 s3Client = AmazonS3ClientBuilder.
standard()
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration(“your-domain-url”, “region-name”))
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build();

After you add the entry to the /etc/hosts file

your-ip your-domain-name-of-s3

Calling putOjbect() is simple thing now

s3Client.putObject(bucketName, objectKey, object);

Bingo! You are done. You should be able to see your data/string/files written into the buckets of your S3. The key takeaways from this article are -

In case you’re wondering if you can use PutObjectRequest instance with your s3 client, the answer is positive. Because S3 is an object storage service developed not just by Amazon but many other vendors and many companies do not want their clients to break their head on just APIs. AWS being the most widely used service out there, the AWS apis are going to work for your service exactly in the same way they work for AWS services.

Super interested in Computer Science & Software Engineering with focus on AI, ML, Kubernetes