Kafka : Kafka Streams에 관하여

1. Kafka Streams 개요

Kafka Streams는 토픽에 적재된 데이터를 상태기반(Stateful) 또는 비상태기반(Stateless)으로 실시간 변환 및 토픽에 적재하는 라이브러리.
카프카의 스트림 데이터 처리를 위해 아파치 스파크(Apache Spark), 아파치 플링크(Apache Flink), 아파치 스톰(Apache Storm), 플루언트디(Fluentd)와 같은 다양한 오픈소스 애플리케이션 존재.
Kafka Streams는 카프카에서 공식적으로 지원.

2. Kafka Streams 애플리케이션의 Task와 Partition

Streams 애플리케이션은 내부적으로 스레드를 1개 이상 생성하며, 스레드는 1개 이상의 Task를 갖음.
Streams 의 Task는 Streams 애플리케이션을 실행하면 생성되며 데이터 처리 최소 단위.

3개의 파티션으로 구성된 토픽을 처리함에 있어 Streams 애플리케이션은 내부적으로 3개의 Task를 갖음.

3. Kafka Streams Topology

Topology 란 2개 이상의 노드들과 선의 조합.
Topology 는 링형, 트리형, 성형 등이 있음.
Kafka Streams 에서는 Topology를 구성하는 노드 하나를 “프로세서(processor)” 라고 함.
Kafka Streams 에서는 Topology를 구성하는 선 하나를 “스트림(stream)” 이라고 함.

4. Kafka Streams Processor

종류 설명
Source Processor 데이터를 처리하기 위해 최초 노드, 하나 이상의 토피에서 데이터를 가져오는 역할
Stream Processor 다른 Processor가 반환한 데이터를 철하는 역할. 변환, 분기처리와 같은 로직 등을 수행
Sink Processor 데이터를 특정 카프카 토픽으로 저장하는 역할

Kafka Streams 개발은 Kafka Streams DSL (Domain Specific Language)과 Kafka Processor API를 활용한 2가지 방법이 있음.

5. Kafka Streams DSL (Domain Specific Language)

Kafka Streams DSL은 레코드의 흐름을 추상화한 KStream, KTable, GlobalKTable 3가지가 있음.

  • KStream

KStream 은 레코드의 흐름.
KStream 을 통해 데이터 조회 시, 토픽에 존재하는 모든 레코드 출력.
컨슈머로 토픽을 구독하는 것과 동일.

  • KTable

메세지 키를 기준으로 묶어서 활용.
토픽의 모든 레코드를 조회할 수 있는 KStream과는 다르게 유니크한 메세지 키를 통해 가장 최신 레코드를 사용.
파티션 1개와 대응하여 해당 파티션에 대한 데이터로 채워짐.

  • GlobalKTable

KTable과 동일하게 메세지 키를 기준으로 묶어서 사용.
KTable과 다르게 모든 파티션에 대응하여 데이터가 채워짐.

6. Kafka Streams & KTable, GlobalKTable

  • Streams 와 KTable 조인

KStream 와 KTable를 조인하려면 반드시 코파티셔닝(Co-partitioning)되어야 함.
코파티셔닝(Co-partitioning)이란 조인을 하는 2개의 데이터 파티션 개수가 동일하고 파티셔닝 전략(partitioning stretegy)을 동일하게 맞추는 작업.
KStream과 KTable의 메세지 키가 동일한 경우 조인을 수행.

조인를 수행하려는 토픽들에 대해 코파티셔닝(Co-partitioning)을 보장할 수 없음.

이런 경우 조인을 수행할 수 없음(코파티셔닝이 되지 않은 2개의 토픽을 조인하는 로직 수행시 TopologyException 발생).
조인 수행을 위해서는 리파티셔닝(repartitioning) 과정이 필요.

  • Streams 와 GlobalKTable 조인

GlobalKTable은 코파티셔닝되지 않은 Kstream과 데이터 조인 가능.
GlobalKTable은 Stream의 모든 Task에게 동일하게 공유가 가능.
모든 데이터를 저장하고 사용하기 때문에 로컬 스토리지 사용량이 증가, 네트워크, 브로커 부하가 클 수 있음.
많은 양의 데이터를 가진 토픽에 대한 조인은 리파티셔닝을 통해 KTable을 사용을 권장.

7. Kafka Streams Processor API

Git 명령어 정리

git clone

1
git clone https://XXXX@github.com/test/my-private-repo.git

Kafka : Kafka Producer에 관하여

1. Kafka Producer 의 시작

2. Kafka Producer 주요 옵션

옵션 설명
bootstrap.servers 카프카 클러스터에 연결하기 위한 호스트, 포트 정보 설정
client.dns.lookup 클라이언트가 하나의 IP와 연결하지 못할 경우에 다른 IP로 시도하는 설정
acks 카프카 메세지 수신에 대한 확인 수준 설정
buffer.memory 프로듀서가 카프카로 데이터를 보내기 위한 대기 메모리 바이트(byte) 설정
compression.type 프로듀서 메세지 전송시 압축 타입 설정(none, gzip, snappy, lz4, zstd 등)
max.in.flight.requests.per.connection 프로듀서가 한 번에 몇 개의 요청(Request)을 전송할 것인가에 대한 설정(ACK 없이 전송할 최대 요청 수)
retries 일시적인 장애에 대해 몇 번 재시도 할 것인가에 대한 설정
batch.size 프로듀서의 배치의 사이즈를 결정하는 결정하는 설정
linger.ms 프로듀서의 배치 메세지를 보내기 전에 추가적인 메세지를 위해 기다리는 시간 설정
transactional.id ???
enable.idempotence ???

3. Kafka Producer Transaction

Spring Framework : web.xml을 통한 WebApplicationContext의 생성

1. web.xml ?

  • web.xml은 WebApplication의 Deployment Descriptor(배포 설명자)이며 XML 형식
  • 애플리케이션의 클래스, 리소스, 구성 및 웹 서버가 이를 사용해서 웹 요청을 처리하는 방법을 기술
  • context.xml에서 WatchedResource를 통해 “WEB-INF/web.xml” 를 수정할 수 있음

2. web.xml 작성 예시

아래와 같은 내용이 일반적으로 등장함.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring/*-config.xml</param-value>
</context-param>

<listener>
<display-name>SpringContextLoader</display-name>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<servlet>
<servlet-name>dispatcher</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring/dispatcher-servlet.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>

3. ContextLoaderListener 클래스 다이어그램 및 요약

ContextLoaderListener

  • 최초에 웹 애플리케이션이 실행되며 “context-param” 태그로 지정된 “contextConfigLocation”의 값을 읽음
  • “context-param” 태그에 지정된 “ContextLoaderListener” 클래스를 통해 애플리케이션이 구동되었음을 감지
  • “ContextLoaderListener” 는 java의 확장 패키지은 javax에 속해 있는 “ServletContextListener”를 구현함
    • “ServletContextListener”는 서블릿 컨텍스트에 대한 변경 사항에 대해 이벤트 형식으로 받을 수 잇음
  • “ContextLoaderListener” 는 spring-web의 클래스인 “ContextLoader”를 상속함
    • “ContextLoader”는 “Root WebApplicationContext” 를 초기화 하는 작업을 수행

4. ContextLoaderListener 설명

ContextLo``ader.initWebApplicationContext

“ContextLoaderListener”는 단순히 contextInitialized(…) 메서드를 통해 “context-param” 태그로 설정한 값을 전달받아 ContextLoader의 initWebApplicationContext(…)를 호출함.

ContextLoader.initWebApplicationContext

호출된 “initWebApplicationContext” 메서드 내부에는 context를 생성하는 과정을 거쳐 “servletContext”의 Attribute로 생성된 context를 저장. 여기서 해당 context의 저장할 때의 키를 “ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE”로 지정하는 것을 확인.

ContextLoaderListener_concept

결론적으로 정리를 하면 Servlet 컨테이너인 Tomcat이 실행되면서 “ServletContextListener”의 contextInitialized 를 통해 ServletContext를 “ContextLoaderListener”에 전달 하고 이를 다시 “ContextLoader”로 전달이 되고 있음. 이때, “ContextLoader”는 “WebApplicationContext” 를 생성하여 ServletContext의 속성으로 Root Context로 설정함.

6. DispatcherServlet 클래스 다이어그램 및 요약

DispatcherServlet

  • web.xml의 “servlet” 태그에 있는 서블릿을 등록
  • 최초 서블릿이 등록이 될 때, GenericServlet의 “+init(..config)” 메서드를 호출하며 ServletConfig를 매개변수로 전달
  • ServletConfig에는 “contextConfigLocation”의 값인 “classpath:spring/dispatcher-servlet.xml” 값을 갖고 있음
  • GenericServlet의 “+init(..config)”은 내부적으로 “init()”을 호출하며 이는 HttpServletBean가 구현함
  • HttpServletBean의 “init()”은 내부적으로 bean 프로퍼티를 설정을 완료 후 “#initServletBean()” 를 호출하며 해당 메서드는 FrameworkServlet에 구현
  • FrameworkServlet의 “#initServletBean()”를 통해 WebApplicationContext가 생성되며 생성된 WebApplicationContext의 부모로 context로 Root Context를 설정

7. DispatcherServlet 설명

GenericServlet.init

“Servlet”를 구현한 클래스는 서비스에 배치되는 시점에 서블릿 컨테이너가 “init(..config)” 메서드 메서드를 호출함. 여기서 GenericServlet이 이 메서드를 구현함. 내부적으로 이때 ServletConfig 정보가 넘어오며 서브릿을 구성하기 위한 정보와 더불어 web.xml에서 작성한 서블릿의 “contextConfigLocation” 정보도 넘어옴. 게다가 내부적으로 “init()”를 호출하며 해당 메서드는 HttpServletBean 클래스에서 구현함을 확인.

HttpServletBean.init()

초기 파라메터의 값을 활용하여 bean 설정을 위한 프로퍼티를 설정. 설정이 완료된 이후에 “#initServletBean()” 메서드를 호출함. 해당 메서드는 FrameworkServlet 클래스에서 구현함.

FrameworkServlet.initServletBean()

FrameworkServlet의 “#initServletBean()” 메서드는 내부적으로 “#initWebApplicationContext()”를 호출하며 WebApplicationContext를 생성함 이때 내부적으로 “#createWebApplicationContext(…)”를 호출하여 생성함.

FrameworkServlet.initServletBean()

“#createWebApplicationContext(…)”는 매개변수로 ApplicationContext 받음. ApplicationContext로 Root Context가 전달되며 이를 Parent Context로 설정.

8. 최종 정리

ServletContext를 통한 WebApplicationContext 설정

결과적으로 ServletContext는 최초에 실행되는 ContextLoaderListener를 넘어 DispatcherServlet 까지 전달이 되며 이를 통해 내부적으로 부모와 자식관계의 Context를 형성함.

ServletContext를 통한 WebApplicationContext 설정

WebApplicationContext는 BeanFactory를 상속받아 “+getBean(name)”와 같은 bean을 반환하는 메서드를 갖고 있음.

ServletContext를 통한 WebApplicationContext 설정

“+getBean(name)” 메서드는 내부적으로 BeanFactory를 호출하여 bean을 반환하고 있음. 여기서 BeanFactory는 AbstractBeanFactory 클래스를 상속받은 DefaultListableBeanFactory 클래스이며 부모 BeanFactory를 갖고 있음.

ServletContext를 통한 WebApplicationContext 설정

DefaultListableBeanFactory 클래스의 getBean은 자신이 소유한 bean에서 먼저 인스턴스를 찾고 만약 없을 경우 부모 BeanFactory에서 찾아서 반환하는 것을 확인할 수 있음.

결국 Root Context와 자식 Context와 부모와 지삭의 관계를 맺으며 자식 BeanFactory는 부모의 BeanFactory에서 자원을 공유할 수 있음. 단, 반대로 부모 BeanFactory에서 자식의 BeanFactory를 공유하지는 않음.

npm 명령어 정리

npm 버전 확인

1
npm -v

npm 버전 최신화

1
npm install -g npm

Linux 명령어 정리

curl : http 요청

  • http 요청

    1
    curl --location --request GET 'http://127.0.0.1:8401/v1/example?test=1' --header 'Content-Type: application/x-www-form-urlencoded' --header 'TEST: 4406c3c04215d36e36f964582b869e0d7c8eec8482e793e5512a08abd39516d6'
  • http 요청 응답 시간 확인

    1
    curl -o /dev/null -s -w %{time_total} --location --request GET 'http://127.0.0.1:8401/v1/example?test=1' --header 'Content-Type: application/x-www-form-urlencoded' --header 'TEST: 4406c3c04215d36e36f964582b869e0d7c8eec8482e793e5512a08abd39516d6'

서버 Core 확인

  • http 요청 응답 시간 확인

서버 용량 확인

  • 현재 마운트된 디스크 크기, 사용량, 남은 용량 확인

    1
    df -h
  • 하위 디렉토리 제외하고 현재 디록토리 용량 확인

    1
    du -hd 0 ./*

파일 압축, 압축 풀기

  • 특정 폴더 zip 으로 압축하기
    1
    zip -r backup.zip backup/

Elasticsearch-01 : 엘라스틱서치 개요 및 특징

1. 엘라스틱서치(Elasticsearch)란?

엘라스틱서치(Elasticsearch)는 루씬(Lucene) 기반의 검색 엔진입니다. 여기서 루씬(Lucene)은 자바로 개발된 오픈소스 정보검색 라이브러리입니다. 따라서 엘라스틱서치(Elasticsearch) 자바로 개발되었으며 오픈 소스로 출시되었습니다. 게다가 다양한 언어(자바, 닷넷(C#), PHP, 파이썬, 그루비 등) 클라이언트를 지원합니다. 엘라스틱서치(Elasticsearch)는 로그스태시(Logstash)라는 이름의 데이터 수집 및 로그 파싱 엔진, 그리고 키바나(Kibana)라는 이름의 분석 및 시각화 플랫폼과 함께 개발되었습니다. 이를 엘라스틱서치(Elasticsearch) + 로그스태시(Logstash) + 키바나(Kibana)를 같이 연동하여 사용한다는 의미로 ELK 혹은 ELK 스택(ELK Stack)라고 합니다.


2. 일래스틱서치(Elasticsearch)의 특징

엘라스틱서치(Elasticsearch)의 특징은 아래와 같습니다.

  • 설치 과정이 간단한 편, 우분투같은 데비안 기반 운영체제면 apt-get으로도 쉽게 설치 가능
  • 노드라고 불리는 프로세스 단위로 구성되어 있는데, 확장이 필요하다면 그저 새 노드를 실행하고 기존의 노드와 연결하여 scale-out이 매우 간단함
  • 데이터를 저장한 뒤 검색하기 위해 재실행과 같은 과정이 필요없음, 인덱싱만 끝나면 검색이 가능
  • 하나의 인덱스에 하나의 타입만을 구성할 수 있음
  • HTTP를 통해 JSON 형식의 Restful API를 사용할 수 있음
  • 오픈소스이므로, 다양한 기능의 플러그인이 존재
  • 하나 이상의 노드가 실행되면서, 죽은 노드가 있을 경우 자동으로 감지 후 다른 노드에게 맡기며 매우 안정적인 서비스를 보장
  • 비정형 로그 데이터를 수집하고 한곳에 모아 통계 분석을 할 수 있음
  • 전체적인 클러스터의 성능 향상을 위해 비용 소모가 큰 롤백과 트랜잭션 기능이 없음
  • 데이터 저장 시점에 해당 데이터를 색인한
  • 색인된 데이터는 1초 뒤에나 검색이 가능해져서 실시간으로 검색이 불가능. 또한 내부적으로 커밋(commit), 플러쉬(Flush)와 같은 복잡한 과정
  • 업데이트는 기존 문서를 삭제(delete)하고 다시 삽입(insert)하는 방식
  • ELK 스택을 구성하여 활용할 수 있음