Bir Flink Hikayesi…

Süleyman Fazıl Yeşil
17 min readJan 26, 2025

--

“Kredi kartından yurtdışında aynı firmada son 1 saatte 2 adetten fazla işlem yapıldığı ve toplam tutar 5000TL üzerinde olduğunda alarm oluştur.”

“Kredi kartından son 1 saat içinde 2 adetten fazla nakit avans çekildiğinde alarm oluştur.”

Finansal veya finansal olmayan işlemleri belirli kurallar ve desenler çerçevesinde yakın gerçek zamanlı (near real time) olarak kontrol etmek istiyorsunuz. Bunu nasıl yaparsınız?

Bu soruya farklı cevaplar vermek mümkün. Bağlama göre cevaplar farklılaşabilir. Tek bir doğru cevap yok diğer tüm mühendislik problemlerinde olduğu gibi. Bağlamdan bağımsız ideal çözüm hiçbir zaman olmaz. Kusursuz tek bir doğru yöntemden bahsedildiğinde uzak durmaya çalışmak lazım. Dar bir gözlükle bakılıyordur çünkü. Mükemmel ve kusursuz çözümü değil de, Neal Ford’un dediği gibi kendi bağlamımızda ihtiyacımızı karşılayan en az kötü olançözümü aramak lazım. Mevcut bir uygulamayı mı revize ediyorsunuz, sıfırdan mı yazıyorsunuz? İşlem hacmi 100 TPS mi olacak, 1K TPS mi, 10K TPS mi, daha mı fazla? Ne kadar gecikme kabul edilebilir, gerçek zamanlı mı, yakın gerçek zamanlı mı yoksa batch mi olacak? Kaç senaryo olacak? Senaryoları kim tanımlayacak, tanımlama ne ölçüde esnek olacak? Uygulama nerede çalışacak? Kim çalıştıracak? Geliştirme maliyeti ne? Operasyon maliyeti ne? Karmaşıklık düzeyi ne? Problemi çözecek teknik ekibin yetkinleri ve deneyimi ne? Geliştirme için ne kadar vakit var? Vesaire vesaire.

Bizim kendi bağlamımızda bu soruya nasıl cevap aradığımızın hikayesini aktarmaya çalışacağım.

Yöntemlerden ilk akla geleni, bunun için bir uygulama yazmak. Bir arayüz üzerinden kuralları tanımlamak. Gelen tüm işlemleri veritabanına kaydetmek. Her yeni gelen işlem için işlem geçmişini de dikkate alarak kuralları çalıştırmak. Herhangi bir kurala takılan işlemi riskli olarak işaretlemek ve akabinde gerekli aksiyonları almak. Uygulamayı kendiniz yazdığınız için tüm güç ve esneklik sizde, fakat sorgulamayı veritabanı üzerinden yaptığınız durumda 10K TPS ve üzerine çıktığınızda ilişkisel veritabanının fiziksel limitlerine takılabiliyorsunuz. Malum ilişkisel veritabanları Exadata kullansanız bile nihayetinde fiziksel sınırlara sahip. Dikeyde sonsuza kadar büyüyemiyorsunuz. Yatayda büyüyebilen dağıtık NOSQL veritabanları ise ilişkisel veritabanlarının sorgulama esnekliğine ve yeteneklerine sahip değil. Dolayısıyla ilişkisel veritabanı üzerinden sorgulama yaparak çalışan bir uygulama belli bir noktadan sonra geçerli bir seçenek olmaktan uzaklaşmaya başlıyor.

İkinci yöntem ise geçmişe ait veri tutabilen akan veri işleme (statfeul stream processing) platformlarını kullanmak. Bu platformlar “son bir saatte”, “son bir günde”, “son bir haftada”, “toplamda” vb. gibi müşteri, iş yeri vb bazlı istatiksel verileri, veriler aktıkça kümülatif olarak bellekte biriktirebiliyor. Dolayısıyla bu tarz şeyler için veritabanına sorgulama yapma ihtiyacı ortadan kalkıyor. Bu da veritabanındaki yükü oldukça hafifletiyor. Yük veri işleme platformuna kaymış oluyor. Veri işleme platformları bu bilgileri bellek üzerinde tuttuklarıve dağıtık mimaride çalışabildikleri için performanslı çalışıyorlar. Akan veri işleme platformları kendi yazdığınız uygulama kadar esnek olmasa da, genel amaçlı yazıldıklarından dolayı çeşitli kısıtlara tabi olsalar da, üst seviyeli veri yapıları ve veri işleme API’leri, dağıtık sistemlerin karmaşıklığı ve hata yönetimi gibi birçok konuyu arka planda çözüp izole ettiklerinden ve yatayda ölçeklenebilir yapıda olduklarından dolayı oldukça güçlü bir seçenek sunuyorlar. Bu alanda birçok çözüm bulunuyor. Biz, yakın gerçek zamanlı ve düşük gecikmeli çalıştığından, durum bilgisi (state) tutabildiğinden, bu alandaki önde gelen olgun araçlardan biri olduğundan ve bizim ihtiyacımıza benzer örneklerde özelllikle kullanıldığından dolayı Flink’i tercih ettik.

Bu yazıda, elde ettiğimiz tecrübeler çerçevesinde Flink’in kavram dünyasını tanıtıp, entegrasyonel ve operasyonel konulara değinmeyi planlıyorum.

Öncelikle Kavramsal olarak Flink’in temel işleyişini, Flink SQL ile senaryo görevlerini (job) tanımlamayı aktarmayı planlıyorum.

Sonrasında Flink REST API’sini kullanarak, Flink’i kendi uygulamamıza entegre etmeyi ve job’ların yaşam döngüsünü yönetmeyi aktaracağım.

Akabinde otomatik entegrasyon testlerini göstereceğim.

Operasyonel tarafta ise Flink konteynır imajının özelleştirilmesi, Flink Kubernetes Helm Chart kullanarak Flink cluster oluşturmayı anlatacağım.

Son olarak da performans konusuna bakacağım. Yazılımda özellikle yeni kullanmaya başladığınız bir araçla ilgili varsayımları bir kenara bırakıp, performans testleri yapmanın ve farklı konfigürasyonları farklı şeyleri denemenin ne kadar önemli olduğunu aktarmaya çalışacağım.

Spotify’da takip ettiğim çok güzel bir podcast var, “Nasıl Olunur?” isimli, Nilay Örnek’in hazırladığı. Balmain baş sneaker tasarımcısı Safa Şahin’in konuk olduğu bölümde, Safa Şahin Nike’ta çalıştığı dönemde sneaker tasarım sürecini ve malzeme kullanımını aktarırken dedi ki: “İki cent, bir cent, tartışılıyor. Çünkü bir milyon adet üretiliyor, iki milyon üç beş milyon adet ürettiği için, atıyorum %45 deri kullamışsın, %55 kumaş kullanmışsın, ya deri biraz pahalı diyor, sen bunu %44 yap diyor, tasarımı hafiften çalıyorsun filan böyle, fiyatı aşağıya düşürüyorsun, böyle fiyat analizleri var, onun üzerinden tartışıyorsunuz, fiyatı tam yakalayınca tamam bu şekilde kalsın diyorsunuz”.

Yani ölçek büyünce küçük şeyler büyüteç tutulmuş gibi oluyor, önemli hale geliyor. Kaç TPS olacak, kaç senaryo olacak filan bunlar önemli oluyor. Veriyi nasıl kodladığınız, veri boyutu, bant genişliği, kaynak kullanımı, performans hepsi önemli hale geliyor. Bu tarz konulardaysa varsayımlar yanlış çıkabiliyor. Deneyip test etmek, etkisini görmek ve ona göre ilerlemek hayati oluyor.

Akan Veri İşleme (Stream Processing)

Akan veri işleme alanında sektörde pek çok araç bulunuyor. Bu araçlar sundukları özellikler, güçlü ve zayıf yönleri, olgunluk düzeyleri ve sektörel benimsenme düzeyi ve yaygınlıkları açısından farklılık gösteriyorlar. Tamamen apayrı bir uzmanlık konusu. Aşağıda bu ekosistemin güncel bir görünümü yer alıyor.

Kaynak: The Data Streaming Landscape 2025 — Kai Waehner

Flink Temel Kavramlar

Apache Flink açık kaynak bir veri işleme platformu. Stream ve batch veri işlemeyi bütünleşik olarak sunan dağıtık bir platform. İlk sürümü 2011'de yayınlanmış. Java ve Scala kullanılarak yazılmış. Veri işlemek için Java, Scala, Python ve SQL kullanılarak uygulama yazılabiliyor. Komünite, Flink Forward isimli katılımı yüksek bir yıllık konferans düzenliyor.

Flink kendini “Stateful Computations over Data Streams” olarak tanımlıyor, yani veri akışı üzerinde state (tarihsel bilgi, durum) tutabilen bir veri işleme aracı.

Aynı platform üzerinde hem stream hem de batch veri işleme yapabiliyor. Stream’i (akan veri) sınırsız (unbounded) veri, batch’i sınırlı (bounded) veri olarak ele alıyor.

Flink 4 seviyeden oluşan katmanlı bir API sunuyor.

Flink APIs

Flink mimarisi ise aşağıdaki gibi:

Flink Task Manager, worker niteliğinde. Flink Job’lar Task Manager’lar üzerinde çalışıyor.

Flink Job Manager, Flink Job’ları yönetiyor. Başlatılması, durdurulması, statülerinin takibi, otomatik Task Manager oluşturulması, REST API sunulması vb görevleri yapıyor.

Geç gelen işlemlerin dikkate alınıp sıralanabilmesi için Watermark mekanizması sunuyor.

İç tutarlılık ve hata durumlarında otomatik hata yönetimi ve kaldığı yerden devam edebilmek için checkpoint ve savepoint mekanizmaları sunuyor. Chekpoint modları üzerinden tutarlılık için hem “tam olarak bir kez” hem de “en az bir kez” (exactly once, at least once) opsiyonlarını sunuyor.

Veriyi paralel olarak işleyebiliyor. Flink Job başlarken parallelism değerini ayarlamak yeterli, yatayda büyüyüp küçülebilmek için.

Veri işlemede “son 1 saatte” gibi zaman penceresi tanımlanabiliyor, bunun için window yapısı var. Farklı zaman penceresi çeşitleri sunuyor: tumbling, sliding vb.

Hem veri işleme zamanı (proccessing time) hem de olay zamanı (event time) bazlı çalışılabiliyor.

Kubernetes platformuna operator ile kurulum yapılabiliyor ve otomatik kaynak yönetimi yapabiliyor.

Flink SQL

Flink SQL, Flink’in en üst seviyeli API’si. Örneğin yazının başında ifade edilen senaryo Flink SQL ile basitçe yazılabiliyor. ChatGPT’ye senaryoyu söylediğinizde size aşağıdaki gibi bir karşılık sunuyor.

Senaryo:

Kredi kartından yurtdışında aynı firmada 
son 1 saatte 2 adetten fazla işlem yapıldığı
ve toplam tutar 5000TL üzerinde olduğunda alarm oluştur.

Flink SQL Karşılığı:

CREATE TABLE alerts (
card_id STRING,
firm_id STRING,
total_amount DOUBLE,
transaction_count INT,
alert_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/your_database',
'table-name' = 'alerts',
'username' = 'your_username',
'password' = 'your_password'
);

INSERT INTO alerts
SELECT
card_id,
firm_id,
SUM(amount) AS total_amount,
COUNT(*) AS transaction_count,
CURRENT_TIMESTAMP AS alert_time
FROM transactions
WHERE amount > 0
GROUP BY
card_id,
firm_id,
TUMBLE(transaction_time, INTERVAL '1' HOUR)
HAVING COUNT(*) > 2
AND SUM(amount) > 5000;

Flink REST API Entegrasyonu

Flink REST API’sini uygulamaya entegre etmek oldukça basit. İlgili versiyonun API dökümanına link üzerinden erişip indirebilirsiniz. Doküman OpenAPI 3 ile yazılmış, Postman’e API koleksiyonu olarak aktarabiliyoruz, ayrıca openapi-generator ile istemci tarafındaki kodları otomatik oluşturabiliyoruz.

Flink API Koleksiyonunun Postman’e Aktarımı

API doküman linkini Postman’e yapıştırıp API koleksiyonunu aşağıdaki gibi oluşturabiliyoruz.

Flink API dokümanının Postman’e aktarımı

Flink API koleksiyonunu oluşturduktan sonra aşağıdaki gibi, Flink API üzerinden Flink cluster’ına erişip yeni job oluşturabiliyoruz, askıya alabiliyoruz, statü sorgulaması vb şeyler yapabiliyoruz.

Flink API Koleksiyonu üzerinden Flink API Kullanımı

Flink API Client Oluşturulması

Flink API’yi uygulamamıza entegre etmek için istemci (client) tarafındaki kodları (stub, api client vb) otomatik oluşturabiliyoruz, OpenAPI standart spesifikasyonu ve buna dayanan araçlar üzerinden.

Bu amaçla openapi-generator npm paketini indirip kullanıyoruz.

npm install @openapitools/openapi-generator-cli -g

API Client için için yeni bir proje oluşturup, proje dizini içinde aşağıdaki şekilde script çalıştırıp API entegrasyon kütüphanesi oluşturuyoruz.

openapi-generator-cli generate \
-i https://nightlies.apache.org/flink/flink-docs-release-1.16/generated/rest_v1_dispatcher.yml \
--api-package org.apache.flink.apiclient.api \
--model-package org.apache.flink.apiclient.model \
--invoker-package org.apache.flink.apiclient.invoker \
--group-id org.apache.flink.apiclient \
--artifact-id flink-api-client \
--artifact-version 1.16.0 \
-g java \
-p java8=true \
--library apache-httpclient \
--global-property skipFormModel=false \
--additional-properties useRuntimeException=true

Scripti çalıştırdıktan sonra aşağıdaki gibi bir proje oluşuyor, bunu mvn repomuza yayınlayıp uygulamımızda kullanabilir hale geliyoruz.

Uygulamadan Flink API’ye Erişim

public class FlinkApiPlayground {
private static final Logger logger = LogManager.getLogger(FlinkApiPlayground.class);

private static final String FLINK_API_BASE_PATH = "http://xxxx";

private static final FlinkApi flinkApi;
private static final FlinkApiConfig flinkApiConfig;

public static class FlinkApi extends DefaultApi {
public FlinkApi(FlinkApiConfig config) {
ApiClient apiClient = Configuration.getDefaultApiClient();
apiClient.setBasePath(config.getBasePath());
setApiClient(apiClient);
}
}
public static class FlinkApiConfig {
private String basePath;
// ...

// getters & setters
}

static {
flinkApiConfig = new FlinkApiConfig();
flinkApiConfig.setBasePath(FLINK_API_BASE_PATH);
flinkApi = new FlinkApi(flinkApiConfig);
}

public static void main(String[] args) {
logger.info("Welcome to Flink Api Playground...");
try {
ClusterOverviewWithVersion overview = flinkApi.getClusterOverview();
logger.info("Flink cluster overview: {}", overview);

JarListInfo jarListInfo = flinkApi.getJarList();
logger.info("Flink jar list: {}", jarListInfo);

String jarId = jarListInfo.getFiles().get(0).getId();
logger.info("Jar id: {}", jarId);

JarRunResponseBody jobResponse = flinkApi.submitJobFromJar(jarId, false, null, null, null, null, null, null);
logger.info("Job response: {}", jobResponse);

} catch (Exception e) {
logger.error(e);
}
logger.info("Enough playing... :)");
}

Flink Entegrasyon Testleri

Flink entegrasyonun testleri için farklı stratejiler mevcut ihtiyaca göre.

Flink entegrasyon testleri için bir mini cluster altyapısı sunuyor, JUnit5 ile ilgili eklentileri kullanıp embedded bir cluster oluşturup entegrasyon testleri yapabiliyoruz. API entegrasyon testleri için bu altyapıyı kullanmak mümkün.

Flink Job’larının çalışması içinse mini cluster oluşturmaya ihtiyaç olmuyor. TestContainers ile Kafka ve DB’yi birlikte ayağa kaldırıp Flink Job’ı test metodunda çalıştırıp sonuçları kontrol etmek yeterli.

Flink Mini Cluster ile API Entegrasyon Testleri

class FlinkClientIntegrationTest {
@RegisterExtension
public final static MiniClusterExtension FLINK_CLUSTER = new MiniClusterExtension(getClusterConfig());

// örnek: https://github.com/apache/flink/blob/master/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
private static FlinkApi flinkApi;

private static MiniClusterResourceConfiguration getClusterConfig() {
return new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build();
}

@BeforeAll
static void beforeAll(@InjectClusterRESTAddress URI restAddress) {
logger.info("Flink mini test cluster configured...");
logger.info("Flink mini cluster url: {}", restAddress);

FlinkApiConfig flinkApiConfig = new FlinkApiConfig();
flinkApiConfig.setBasePath(restAddress.toString());
flinkApi = new FlinkApi(flinkApiConfig);
}

@Test
@DisplayName("should fetch cluster overview successfully")
void should_fetch_cluster_overview_successfully(@InjectClusterRESTAddress URI restAddress) {
ClusterOverviewWithVersion clusterOverview = flinkApi.getClusterOverview();
logger.info("Cluster overview: {}", clusterOverview);

Assertions.assertNotNull(clusterOverview);
}

}

Flink Job Entegrasyon Testleri

Bu bölümde örnek bir Flink Job ve testini aktaracağım. Flink Job kodu büyük ölçüde ChatGPT yardımıyla oluşturuldu örnek amaçlı bir koda ihtiyaç duyduğumuz için.

Kullanılan veri yapıları şöyle:

  • Transaction: İşlemleri modelliyor.
  • TransactionDeserializer: Flink tarafında deserialization işlemi için kullanılıyor. ChatGPT ile oluşturuldu.
  • SimpleFlinkJob: Flink SQL ile yazılmış, basit bir Flink Job sınıfı. İçindeki senaryo tutarı 100'den büyük işlemler için alarm oluşturup DB’deki alerts tablosuna yazıyor. ChatGPT ile oluşturuldu.
  • AbstractKafkaIntegrationTest: Test containers kütüphanesi üzerinden hem Kafka hem de PostgreSQL container’ı ayağa kaldırıp sunan baz sınıf. Implementasyon detayları bulunmuyor. Testcontainers kullanımı için şu yazıya göz atabilirsiniz.
  • FlinkJobIntegrationTest: Entegrasyon test sınıfı. AbstractKafkaIntegrationTest altyapısını kullanıyor. BDD stiliyle yazılmış entegrasyon testi içeriyor. Temelde yaptığı SimpleFlinkJob’ı çalıştırıp bir işlem beslemek ve alerts tablosuna alarm kaydının yazıldığını teyit etmek.

Transaction Sınıfı:

class Transaction {
private Integer amount;
private String customerNo;
// getters & setters
}

TransactionDeserializer:

public class TransactionDeserializer implements KafkaRecordDeserializationSchema<Transaction> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<Transaction> out) throws IOException {
String json = new String(record.value());
Transaction transaction = objectMapper.readValue(json, Transaction.class);
out.collect(transaction);
}
@Override
public TypeInformation<Transaction> getProducedType() {
return TypeInformation.of(Transaction.class);
}
}

SimpleFlinkJob:

public class SimpleFlinkJob {
public static void run(String kafkaBootstrapServers, String topic, String dbUrl, String username, String pass) throws Exception {
// 1. Flink stream execution environment ve table environment oluşturuyoruz
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8888);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(Duration.ofMillis(100).toMillis());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 2. KafkaSource oluşturuyoruz ve custom deserializer kullanıyoruz
KafkaSource<Transaction> kafkaSource = KafkaSource.<Transaction>builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(topic)
.setGroupId("flink-transactions-consumer")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new TransactionDeserializer())
.build();

// 3. Kafka'dan gelen veriyi alıyoruz
DataStream<Transaction> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

// 4. Veriyi Flink SQL ile işlemek için bir view oluşturuyoruz
tableEnv.createTemporaryView("transactions", kafkaStream);

// 5. PostgreSQL JDBC sink için bir yapı oluşturuyoruz
String jdbcSink = "CREATE TABLE alerts ("
+ "id SERIAL PRIMARY KEY,"
+ "customer_no VARCHAR,"
+ "amount DOUBLE PRECISION"
+ ") WITH ("
+ "'connector' = 'jdbc',"
+ "'url' = '" + dbUrl + "',"
+ "'username' = '" + username + "',"
+ "'password' = '" + pass + "',"
+ "'driver' = 'org.postgresql.Driver',"
+ "'table-name' = 'alerts'"
+ ")";
tableEnv.executeSql(jdbcSink);

// 5. Veritabanına yazma ve filtreleme işlemini tek bir SQL sorgusu ile yapıyoruz
String flinkJobSql = "INSERT INTO alerts "
+ "SELECT ROW_NUMBER() OVER () AS id, customerNo AS customer_no, amount "
+ "FROM transactions "
+ "WHERE amount > 100";
tableEnv.executeSql(flinkJobSql);

// 8. İşlemi başlatıyoruz
env.execute("Flink Kafka to DB with Alerts");
}
}

FlinkJobIntegrationTest:

class FlinkJobIntegrationTest extends AbstractKafkaIntegrationTest {
static Producer<String, Transaction> producer;

Connection conn;
Statement stmt;

static String bootstrapServers;
static String jdbcUrl;
static String username = "xxx";
static String pass = "xxx";
static String topic = "transactions";

static {
createKafkaTopic(topic);
initKafkaProducer();
bootstrapServers = kafkaContainer.getBootstrapServers();
jdbcUrl = "jdbc:postgresql://localhost:" + postgresqlContainer.getFirstMappedPort() + "/app-schema";
}

@BeforeEach
void beforeEach() throws Exception {
Connection conn = DriverManager.getConnection(jdbcUrl, username, pass);
stmt = conn.createStatement();
}
@AfterEach
void afterEach() throws SQLException {
stmt.close();
conn.close();
}

@Test
@DisplayName("should create alerts for transactions fed to kafka ")
void should_create_alerts_for_transactions_fed_to_kafka() throws Exception {
CountDownLatch jobStartedLatch = new CountDownLatch(1);
Runnable simpleFlinkJob = new Runnable() {
@Override
public void run() {
try {
String jdbcUrl = postgresqlContainer.getFirstMappedPort() + "/app-schema";
SimpleFlinkJob.run(bootstrapServers, topic, jdbcUrl, username, pass);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
Runnable clusterReadinessCheck = new Runnable() {
@Override
public void run() {
while (true) {
try {
RestTemplate restTemplate = new RestTemplate();
String result = restTemplate.getForObject("http://localhost:8888/jobs/overview", String.class);
boolean isJobRunning = result.contains("\"state\":\"RUNNING\"");
boolean isTaskRunning = result.contains("\"running\":1");
if (isTaskRunning & isJobRunning) {
jobStartedLatch.countDown();
}
Thread.sleep(1000);
} catch (Exception ex) {
logger.error(ex);
}
}
}
};
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(simpleFlinkJob); // flink job'ı çalıştır
executorService.execute(clusterReadinessCheck);// ayrı bir thread'de cluster hazır hale geldi mi kontrol ederek beklet


jobStartedLatch.await(1, TimeUnit.MINUTES); // Job başlayıncaya kadar bekle, maks 1dk, ama o kadar sürmez genelde
Thread.sleep(2000);

feedTransactionToKafkaTopic(topic); // kafkaya işlem besle

Thread.sleep(2000); // sleep kullanmak iyi değil genel olarak :)


// DB'de alerts tablosunda 1 kayıt olduğunu teyit et
ResultSet alertCountResultSet = stmt.executeQuery("select count(*) from alerts");
alertCountResultSet.next();
Assertions.assertEquals(1, alertCountResultSet.getInt(1), "expected 1 alert");
}


void feedTransactionToKafkaTopic(String topic) {
Transaction trx = new Transaction();
trx.setCustomerNo("123");
trx.setAmount(100);
producer.send(new ProducerRecord<>(topic, trx.getCustomerNo(), trx));
}

static void initKafkaProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
}

Flink Konteynır İmajının Özelleştirilmesi

Flink konteynır imajı açık kaynak olarak sunuluyor. Checkpoint ve Savepoint’lerin S3'e yazılabilmesi için S3 plugini aktif etmek ve çalışma anında ihtiyaç duyulan DB driver vb kütüphaneleri imajın içine gömmek mümkün.

FROM flink:1.16.1

ENV FLINK_LIB_DIR=/opt/flink/lib
ENV OPTIONAL_LIB_DIR=/opt/flink/opt
ENV S3_PLUGIN_DIR=/opt/flink/plugins/flink-s3-fs-hadoop

WORKDIR ${S3_PLUGIN_DIR}
RUN cp ${OPTIONAL_LIB_DIR}/flink-s3-fs-hadoop-1.16.1.jar ./

WORKDIR ${FLINK_LIB_DIR}
COPY shared-job-libs ./

Flink Cluster — Flink Kubernetes Operator

Flink Kubernetes platformu üzerinde Flink cluster oluşturmak için Helm chart bazlı bir Kubernetes operatörü sunuyor. Operatörün mimarisi ve genel işleyişi aşağıdaki resimlerde gösterildiği gibi.

Kaynak: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/
Kaynak: Architecture | Apache Flink Kubernetes Operator

Operatör FlinkDeployment ve FlinkSessinJob isimli iki CRD (Custom Resource Definition) sunuyor. Biz FlinkDeployment kullanacağız.

Operatör ve sunduğu FlinkDeployment CRD üzerinden cluster’daki ortamı (Job Manager, Task Manager) oluşturup ayarlayabiliyoruz. Job Manager’ı HA veya tek kopya olarak çalışacak ayarlıyoruz. Task Manager’ları ise otomatik olarak ihtiyaç duydukça yeni bir kopya oluşturacak şekilde ayarlıyoruz.

Kubernetes cluster seviyesinde operasyonlar yaptığı için yetkilendirme önem kazanıyor. Eğer üçünçü parti olarak kurumsal müşterelere ürün tedariki yapıyorsanız, ürünün tüm clusterda istediği gibi hareket etmesi istenmez bu nedenle rol ve yetki tanımları önem kazanır.

Aşağıda flink-operator ve flink olmak üzere iki rol tanımı ve kullandıkları yetkiler gösteriliyor.

Kaynak: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/operations/rbac/

OpenShift’te Yetkilendirme: Roller ve Rollerin Bağlanması

OpenShift üzerinde local namespace bazında veya cluster seviyesinde roller tanımlanabilmektedir. Local roller local namespace’lere bağlanabilirken (RoleBinding), cluster seviyesindeki roller hem cluster seviyesinde (ClusterRoleBinding) hem de local namespace’ler seviyesinde (RoleBinding) bağlanabilmektedir. Rolün etki alanını cluster role olması belirlemiyor, nasıl bağladığımız belirliyor.

Kaynak: Using RBAC to define and apply permissions | Authentication and authorization | OpenShift Container Platform 4.11

Operatör Helm chartı ile cluster’ı kurmak ve güncellemek için otomasyon sisteminde kullanılmak üzere “flink-deployer” isimli bir servis hesabı (ServiceAccount) tanımlıyoruz. Akabinde gerekli yetki düzenlemesini yapıyoruz aşağıda adım adım verildiği şekliyle.

crd-admin Rolü ve flink-deployer’a Yetki Verilmesi

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: crd-admin
rules:
- verbs:
- '*'
apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-crd-admin
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: crd-admin

crd-admin cluster rolünü yine cluster seviyesinde flink-deployer servis hesabına bağlıyoruz. Çünkü cluster seviyesinde CRD (Custom Resource Definition) ekleyebilmesi gerekiyor. Rol tanımına baktığımızda sadece gerekli API’lere (customresourcedefinitions) erişim izni verdiğimiz görülüyor.

flink-operator Rolü ve flink-deployer’a Yetki Verilmesi

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-operator
rules:
- verbs:
- '*'
apiGroups:
- ''
resources:
- pods
- services
- events
- configmaps
- secrets
- verbs:
- list
apiGroups:
- ''
resources:
- nodes
- verbs:
- '*'
apiGroups:
- apps
resources:
- deployments
- deployments/finalizers
- replicasets
- verbs:
- '*'
apiGroups:
- extensions
resources:
- deployments
- ingresses
- verbs:
- '*'
apiGroups:
- flink.apache.org
resources:
- flinkdeployments
- flinkdeployments/status
- flinksessionjobs
- flinksessionjobs/status
- verbs:
- '*'
apiGroups:
- networking.k8s.io
resources:
- ingresses
- verbs:
- '*'
apiGroups:
- coordination.k8s.io
resources:
- leases

Operatör rolü cluster seviyesinde ve epey geniş yetkilere sahip. Tüm cluster’ı oluşturacağı için. Bu rolü flink-cluster namespace için local olarak olacak şekilde flink-deployer’a bağlıyoruz.

kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-flink-operator
namespace: flink-cluster
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: flink-operator

webhook-admin Rolü ve flink-deployer’a Yetki Verilmesi

Bu özelliği direkt olarak kullanmasak da, operatör Helm chart’ı bu yetkiye gereksinim duyuyor. CRD tanımlamada olduğu gibi bu Rolü de cluster seviyesinde veriyoruz. Aşağıda görüldüğü gibi sadece webhook API’lerine olacak şekilde sınırlı yetki bir yetki bu.

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: webhook-admin
rules:
- verbs:
- '*'
apiGroups:
- admissionregistration.k8s.io
resources:
- mutatingwebhookconfigurations
- validatingwebhookconfigurations
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-webhook-admin
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: webhook-admin

flink-deployer’a Proje Admin Yetkisi Verilmesi

Son olarak flink-deployer servis hesabına flink-cluster projesi (namespace) için local admin yetkisi veriyoruz.

kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-flink-cluster-admin
namespace: flink-cluster
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: admin

Flink Operator Helm Chart’ının Özelleştirilmesi ve FlinkDeployment Tanımlaması

Helm chart’lar ve özelleştirilmesi konusunda şu yazıya göz atabilirsiniz. Helm Chart proje dizin yapısı şöyle:

charts dizini altında orijinal operatör Helm chart’ını görüyoruz. templates dizininde bizim kendi eklediğimiz yapılar bulunuyor. values.yaml içinde ise özelleştirdiğimiz değerler bulunuyor. Genel özelleştirme sonrasında kurulum yaparken ortam bazlı olarak bu values.yaml değerlerini değiştirmek de mümkün.

Aşağıda deployment.yaml içeriği yer alıyor.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-deployment
labels:
{{- include "flink-operator.labels" . | nindent 4 }}
spec:
image: {{ .Values.image.flinkImage }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
flinkVersion: {{ .Values.flinkVersion }}
flinkConfiguration:
{{- toYaml .Values.flinkConfiguration | nindent 4 }}
mode: {{ .Values.clusterDeploymentMode }}
serviceAccount: flink-cluster
ingress:
template: {{ .Values.ingressHost }}
taskManager:
resource:
{{- toYaml .Values.taskManagerResource | nindent 6 }}
jobManager:
replicas: {{ .Values.jobManagerReplicaCount }}
resource:
{{- toYaml .Values.jobManagerResource | nindent 6 }}
podTemplate:
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: flink-main-container
envFrom:
- configMapRef:
name: {{ .Values.configMaps.ref }}

Konfigürasyon değerlerinin yer aldığı values.yaml içeriği şöyle:


flink-kubernetes-operator:
defaultConfiguration:
create: true
# Set append to false to replace configuration files
append: true
flink-conf.yaml: |+
# Flink Config Overrides
s3.access-key: xxx
s3.secret-key: xxx
s3.endpoint: https://s3.xxx.amazonaws.com
s3.path.style.access: true
podSecurityContext:
# helm bug, cannot override security context of map type in sub chart
# waiting for the fix,issue: https://github.com/helm/helm/issues/9027 , https://github.com/helm/helm/pull/11440
runAsUser: 1000759999
operatorSecurityContext: {}
webhookSecurityContext: {}
watchNamespaces: ["flink-cluster"]


configMaps:
ref: flink-deployment-config
values:
dbUrl: "jdbc:oracle:thin:@xxx:xxx/xxx?rewriteBatchedStatements=true"
dbUsername: "xxx"
dbPassword: "xxx"
kafkaConsumerGroupId: "xxx"
kafkaTopic: "xxx"
kafkaBrokersIpList: "kafka.flink-cluster.svc.cluster.local:9092"

image:
flinkImage: "registry.xxx.com/our-app-images/flink:1.16.1"
# When to pull the image.
# 'Always' option checks the image hash and pull if it different. Default is 'IfNotPresent'.
pullPolicy: Always

flinkVersion: v1_16
ingressHost: flink.apps.flink-cluster.xxx.com

flinkConfiguration:
taskmanager.numberOfTaskSlots: '5'
jobmanager.memory.jvm-metaspace.size: "536870912"
taskmanager.memory.jvm-metaspace.size: "536870912"
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled: "true"
kubernetes.operator.cluster.health-check.enabled: "true"
state.savepoints.dir: s3://flink-data/savepoints
state.checkpoints.dir: s3://flink-data/checkpoints
kubernetes.operator.periodic.savepoint.interval: 5h
kubernetes.operator.job.autoscaler.enabled: "true"
high-availability: kubernetes
high-availability.storageDir: s3://flink-data/recovery
parallelism.default: "1"
pipeline.operator-chaining: "false"
rest.flamegraph.enabled: "true"


taskManagerResource:
memory: 1024m
cpu: 1

jobManagerReplicaCount: 1
jobManagerResource:
memory: 2048m
cpu: 1

imagePullSecrets:
- name: xxx-registry

Her bir konfigürasyon değerinin ne işe yaradığına burada yazının daha fazla uzamaması için girmeyeceğim. İsimlendirmeler oldukça açık ve fazlasını merak edenler detayları için Flink ve Flink Operator dökümanlarına göz atabilirler.

Performans Testleri ve Optimizasyonlar

Yeni bir teknolojiyi kullanmaya başlıyorsanız, mimarinizde köklü değişikler oluyorsa performans testi yapmanız hayati önem taşır. Varsayımlarla ilerleyemezsiniz. Her bir konfigürasyonu ve etkilerini gözlemlemeniz ve buna göre optimizasyon yapmanız gerekir büyük sayılar yasası gereği. Böyle bir yasa var mı bilmiyorum tabi ki :)

Aşağıda test topoloji örneğini görüyorsunuz.

Flink cluster test topolojisi

Test ortamını hazırladıktan sonra küçük yüklerle başlayıp büyük TPSlere doğru ilerledik. Throughput, CPU kullanımı, RAM kullanımı, bant genişliği kullanımı gibi metrikleri OpenShift metrikleri üzerinden gözlemledik. Önce tek senaryo (job) ile test yaptık. Sonra senaryoları çoklayarak kaynak kullanımlarını inceledik.

Belli başlı bulgular ve çözüm yollarımız şöyle oldu:

  • İlk başta veriler JSON formatındaydı ve kullanılmayan alanları kaldırdığımızda kompakt olarak sadece dolu alanları gönderdiğimizde performans iyileşmesi olduğunu gördük.
  • Flame grafikleri üzerinden CPU kullanımını analiz ettiğimizde özellikle deserialization işlemlerinin darboğaz yarattığını gözlemledik.
  • Kafka mesajlarını sıkıştırdığımızda (compression) 4 kat civarında bir bant genişliği kazancı gördük.
  • Veri serileştirmesi için JSON yerine binary encoding kullanan Protobuf’ı kullandığımızda bant genişliğinde 5 kat bir kazanç elde ettik. CPU kullanımı ise yaklaşık 8 kat düştü. Serileştirme konusunda genel bir bakış için “1001 yüzlü veri” yazısına bakabilirsiniz.
  • Dinamik olarak Job deploy ettiğimiz için JVM metaspace bir noktada yetersiz kaldı ve varsılan 250MB değerini 512MB çıkardık.
  • Bir noktada CPU kullanımı düşük olduğu halde belli bir TPS değerini aşamadığımızı gördük. İncelediğimizde bant genişliğine takıldığımız farkettik. Tek Kafka partition üzerinden çalışırken partition sayısını 4e çıkardık. Paralelinde Flink Job paralelism değerini 4 yaptık. Yatayda paralel dört koldan ilerleyebilmesi için. Ayarlamalar sonrası bant genişliği probleminin belli ölçüde çözüldüğünü gördük. Fakat senaryo sayısının 800lere çıktığı durumda yine de toplam CPU ve bant genişliği kullanımı halen büyük sorun teşkil ediyordu. Çünkü her bir Flink Job bağımsız bir uygulama gibi çalıştığı için her biri Kafka topic’ten veriyi bağımsız olarak çekiyorlardı. Bir işlemin 800kez çekilmesi demek olduğu için halen büyük bir sorun vardı. Bunu çözmek için birden fazla senaryoyu tek bir Job içinde çalıştırmaya baktık. Jobların yaşam döngüsü yönetimini çok karmaşık ve özel terzilik isteyen bir hal alsa da (aynı paket içindeki bir senaryoyu dinamik olarak askıya almak gibi şeyler) kaynak yönetimi istediğimiz seviyelere geldi. 4 ayrı senaryoyu 4 ayrı Job (uygulama) olarak çalıştırmak yerine 4lü bir paket yapmak performansı neredeyse 4 kat artırdı. Fakat burada da bir noktada saturasyon olduğunu gördük ve kendi konfigürasyonlarımız çerçevesinde optimum sayıyı bulmaya çalıştık.
  • Watermark kullanımının CPU yükünü yüzde 50 artırdığını gördük. Watermark periyodunun bu durumda bir etkisi olmadığını gözlemledik. Vaka bazında eğer ihtiyaç yoksa watermark kullanmamaya karar verdik.
  • Tarih vb dönüşümlerin CPU kullanımını artırığını gördük ve tüm veriyi dönüştürüp işlemek yerine senaryolarda ihtiyaç oldukça lokal dönüşümleri yapmanın doğru olacağına karar verdik.

Bitirirken

Akan veri işleme (stream processing) oldukça geniş bir konu. Ayrı bir uzmanlık alanı. Günümüzün uygulamaları oldukça karmaşık. Birçok teknolojiyi bünyelerinde barındırıyorlar, destekliyorlar. Veri tabanları, iş analitiği ve raporlama araçları, makine öğrenmesi, büyük veri teknolojileri, frameworkler vb birçok aracı ve platformu ürünlerimize entegre ediyoruz. Bu da biz mühendislerin hepsinde derinlemesine uzmanlık kazanmasak da bir çok teknolojiyi ihtiyaçlarımız çerçevesinde araştırmamız ve nasıl çalıştığını bilmemizi gerektiriyor. Günümüz mühendislerinin oyun sahası oldukça geniş. Bu hem çoğu şeyde yüzeysel kalmamıza sebep olsa da, veya derinlemesine hakim olsak bile başka bir şeye geçtiğimizde çoğu detayı unutuyor olsak da, oyun böyle ve oyunu kuralına göre oynamaktan başka çare bulunmuyor. Kilerdeki çuval çuval bilgi birikimi ve tecrübe üzerinden değil de mutfak tezgahındaki mevcut yemek malzemeleri üzerinden dar bir gözlükle değerlendirildiğimiz günümüz dünyasında bu güç olsa da yapacak çok da bişey bulunmuyor, oyun böyle. Dolayısıyla bulunduğumuz anın, yaptığımız işin keyfini çıkarmaya çalışmak en iyisi sanki.

Umarım yazı, akan veri işleme konusu ile ilgilenenlere faydalı olur. Ben araştırırken, uğraşırken ve yazarken keyif aldım, umarım siz de okurken keyifle okursunuz. :)

Kaynakça

--

--

Süleyman Fazıl Yeşil
Süleyman Fazıl Yeşil

Responses (2)