Sink
is a ’loading’ part of ETL(Extract, Transformation, Loading) inside Flink.
It is last process of data pipeline, to store data inside datalake after it has been extract from source, and transformed into specific format.
This is example of how you can sink from Flink DataStream
:
|
|
FileSink, and StreamingFileSink
If you see the documents here, you can find out there are StreamingFileSink
and FileSink
.
Internally, StreamingFileSink
is a predecessor of FileSink
. And in the document it has written that FileSink
supports BATCH and STREAMING
both, while StreamingFileSink
is only for streaming.
And finally from Flink 1.17
, StreamingFileSink
has been deprecated, so it would be good to go on with FileSink
from now.
addSink
function requires SinkFunction<T>
parameter, and this is for the case when you’re trying to use StreamingFileSink
. Or instead for FileSink
, call sinkTo
to add on sink logic.
In this post, I’ll just talk about raw file/compressed file sink, which I’ve worked recently. For big data format file, such as parquet
, orc
…, refer the guide of connectors in documents .
Sink out raw text data
First, you need additional dependencies for FileSink
:
|
|
and here is some configurations needs for file sink(some are optional).
|
|
In forRowFormat
, you should define output path, and Encoder
to serialize individual row data, to output stream. If your data is just raw text data, it will be just fine to use default SimpleStringEncoder
with UTF-8
encoded.
withBucketAssigner
is to define the directory name data to be stored, in specific rule. DateTimeBucketAssigner
is to generate directory in date-time format. In above, it will put each data inside directory like /2022-09-10_18-46
withRollingPolicy
is to decide the rule, how/when the stream data will be roll-out as output file. In the rule above, single .txt
file will append the data in stream in following status
- when data has been collected at least 15 minutes
- there are no new elements for 5 minutes
- file size has been reached to 1GB
And with output configuration, file name will be defined as my-data-????.txt
by defining prefix/suffix.
So finally, output will be located as s3://data-storage/2022-09-10_18-46/my-data-????.txt
Some other case, there can be case if user needs to make all elements placed in separate files. File release are related with rolling policy. So in my case, I’ve make custom policy which extends default CheckpointRollingPolicy
|
|
It has been defined shouldRollOnEvent
to always return true, so it will roll out for every elemnts. In this case, every ‘string’ data from DataStream
will be generated as separated text file.
Compress(’.gzip’) output sink file
Now let’s think about when you want to sink data as compressed file(like gzip
). In most of case you would want in this way, to reduce file size, network traffic. As you know, these are all related with cost.
For compressed format, you need to use bulk format, with BulkWriter
which can be defined with base BulkWriter.Factory
. This is not just for compressed file such as zip, gzip
, but also for big data formats like parquet, avro, orc
.
You can find several built-in writer here. And for gzip
, it needs to use CompressWriterFactory
.
|
|
CompressWriters
are builder for creating CompressWriterFactory
instance, and DefaultExtractor
is to turn record into byte array for writing data. This transformed byte array data can be compressed with following hadoop compression codec, by withHadoopCompression
.
- DEFLATE: org.apache.hadoop.io.compress.DefaultCodec
- gzip: org.apache.hadoop.io.compress.GzipCodec
- bzip2: org.apache.hadoop.io.compress.BZip2Codec
- LZO: com.hadoop.compression.lzo.LzopCodec
Okay, now it’s done in code. But it could cause exception as following,
java.util.concurrent.CompletionException:
java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration\n\tat java.base
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
org.apache.flink.formats.compress.CompressWriterFactory.withHadoopCompression(CompressWriterFactory.java:76)
and this means it cannot find class definition for hadoop file system. In this case you should add following package:
|
|