[IoT] MQTT -> Kafka를 이용한 데이터 파이프라인 구축하기 #2

IoT 데이터 파이프라인 구축 시리즈

  1. 인트로

  2. EMQX 브로커 설정 및 데이터 수집(현재글)

  3. EMQX와 Kafka 연동

  4. Kafka Consumer를 활용한 데이터 소비


EMQX 란

EMQX는 IoT(사물인터넷) 및 AI 애플리케이션을 위해 설계된 세계에서 가장 확장성이 뛰어나고 신뢰할 수 있는 오픈 소스 MQTT 메시징 플랫폼이다.

MQTT 란

MQTT(Message Queuing Telemetry Transport)는 저전력, 낮은 대역폭, 불안정한 네트워크 환경에서도 안정적인 통신을 지원하는 경량 IoT 및 M2M 전용 메시징 프로토콜이다.

Publish-Subscribe(발행-구독) 모델을 사용하여 Publisher(데이터 전송)와 Subscriber(데이터 수신) 사이의 브로커(Broker)가 메시지를 중계하며, 1:N 통신에 매우 유용하다.


환경 변수 구성

먼저 클러스터 공통 설정과 노드별 포트를 정의한다.
아래는 env.local 기준이니 참고 부탁드립니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# .env.local

# 공통 설정
EMQX_ALLOW_ANONYMOUS=true

# 클러스터 설정 (모든 노드 공통)
EMQX_CLUSTER_DISCOVERY_STRATEGY=static
EMQX_CLUSTER_STATIC_SEEDS=emqx@emqx1.mqtt,emqx@emqx2.mqtt,emqx@emqx3.mqtt

# emqx1 포트
EMQX1_MQTT_PORT=1883
EMQX1_DASHBOARD_PORT=8080

# emqx2 포트
EMQX2_MQTT_PORT=1884
EMQX2_DASHBOARD_PORT=8081

# emqx3 포트
EMQX3_MQTT_PORT=1885
EMQX3_DASHBOARD_PORT=8082

# nginx 포트
NGINX_PORT=1886
NGINX_STAGE=local

EMQX 3노드 + Nginx 구성 확인

docker-compose.yml에서 EMQX 3개 노드와 Nginx(로드밸런서)를 함께 정의합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
services:
  emqx1:
    image: emqx/emqx:5.7.0
    container_name: emqx1
    environment:
      EMQX_NODE_NAME: emqx@emqx1.mqtt
      EMQX_CLUSTER__DISCOVERY_STRATEGY: ${EMQX_CLUSTER_DISCOVERY_STRATEGY}
      EMQX_CLUSTER__STATIC__SEEDS: ${EMQX_CLUSTER_STATIC_SEEDS}
      EMQX_ALLOW_ANONYMOUS: ${EMQX_ALLOW_ANONYMOUS}
      EMQX_LISTENER__TCP__EXTERNAL: 1883
    ports:
      - "${EMQX1_MQTT_PORT}:1883"
      - "${EMQX1_DASHBOARD_PORT}:18083"

  emqx2:
    image: emqx/emqx:5.7.0
    container_name: emqx2
    environment:
      EMQX_NODE_NAME: emqx@emqx2.mqtt
      EMQX_CLUSTER__DISCOVERY_STRATEGY: ${EMQX_CLUSTER_DISCOVERY_STRATEGY}
      EMQX_CLUSTER__STATIC__SEEDS: ${EMQX_CLUSTER_STATIC_SEEDS}
      EMQX_ALLOW_ANONYMOUS: ${EMQX_ALLOW_ANONYMOUS}
      EMQX_LISTENER__TCP__EXTERNAL: 1883
    ports:
      - "${EMQX2_MQTT_PORT}:1883"
      - "${EMQX2_DASHBOARD_PORT}:18083"

  emqx3:
    image: emqx/emqx:5.7.0
    container_name: emqx3
    environment:
      EMQX_NODE_NAME: emqx@emqx3.mqtt
      EMQX_CLUSTER__DISCOVERY_STRATEGY: ${EMQX_CLUSTER_DISCOVERY_STRATEGY}
      EMQX_CLUSTER__STATIC__SEEDS: ${EMQX_CLUSTER_STATIC_SEEDS}
      EMQX_ALLOW_ANONYMOUS: ${EMQX_ALLOW_ANONYMOUS}
      EMQX_LISTENER__TCP__EXTERNAL: 1883
    ports:
      - "${EMQX3_MQTT_PORT}:1883"
      - "${EMQX3_DASHBOARD_PORT}:18083"

  nginx:
    image: nginx:1.27
    container_name: nginx-mqtt
    environment:
      - NGINX_STAGE=${NGINX_STAGE}
      - NGINX_PORT=${NGINX_PORT}
    ports:
      - "${NGINX_PORT}:${NGINX_PORT}"

Nginx MQTT 로드밸런싱 설정

Nginx Stream 설정으로 MQTT 접속을 EMQX 노드에 분산합니다.

1
2
3
4
5
6
7
8
9
10
11
upstream emqx_backend {
    least_conn;
    server emqx1.mqtt:1883 max_fails=3 fail_timeout=30s;
    server emqx2.mqtt:1883 max_fails=3 fail_timeout=30s;
    server emqx3.mqtt:1883 max_fails=3 fail_timeout=30s;
}

server {
    listen ${NGINX_PORT};
    proxy_pass emqx_backend;
}

클러스터 기동

실행 전 기존 컨테이너/볼륨을 내린 뒤, 환경 파일 기준으로 재기동합니다.

1
2
3
# 로컬 환경
docker compose -p emqx-cluster --env-file env.local down --volumes --remove-orphans
docker compose -p emqx-cluster --env-file env.local up -d

MQTT 메시지 발행/구독 테스트

먼저 Subscriber를 열고, Publisher로 JSON 메시지를 발행해 데이터 수집을 확인합니다.

구독 (발행된 메세지 읽기)

1
2
3
4
5
6
# subscribe
mosquitto_sub -h 127.0.0.1 -p 1886 -t /mqtt/test/# -v

"C:\Program Files (x86)\mosquitto\mosquitto_sub.exe" -h MQTT 브로커 접속 URL -p 포트 -t MQTT 브로커에서 구독할 토픽 -v

"C:\Program Files (x86)\mosquitto\mosquitto_sub.exe" -h 192.168.1.16 -p 1886 -t /mqtt/test/# -v

발행 (메세지 발행)

1
2
3
4
5
6
7
8
9
10
# publish
mosquitto_pub -h 127.0.0.1 -p 1886 -t /mqtt/test/vehicle1 -m "{\"imei\":\"1234\",\"speed\":80,\"time\":\"2026-02-20 10:00:00\"}"

"C:\Program Files (x86)\mosquitto\mosquitto_pub.exe" -h MQTT 브로커 접속 URL -p 포트 -t MQTT 브로커에서 구독할 토픽 -m 메세지

# cleanSession = true
"C:\Program Files (x86)\mosquitto\mosquitto_pub.exe" -h 192.168.1.16 -p 1886 -t /mqtt/test/vehicle1 -m "{\"imei\":\"1234\",\"speed\":80,\"time\":\"2025-08-27 10:00:00\"}"

# cleanSession = false
"C:\Program Files (x86)\mosquitto\mosquitto_pub.exe" -h 192.168.1.16 -p 1886 -t /mqtt/test/vehicle1 -m "{\"imei\":\"0002\",\"speed\":111,\"time\":\"2025-09-01 10:00:00\"}" --repeat 1000 --repeat-delay 1 -q 1 -i vehicle_client_1 -c
1
2
3
4
5
{
  "imei": "1234",
  "speed": 80,
  "time": "2026-02-20 10:00:00"
}

클러스터 상태 점검

각 노드 상태와 클러스터 구성 노드를 확인합니다.

1
2
3
docker exec emqx1 /opt/emqx/bin/emqx_ctl status
docker exec emqx2 /opt/emqx/bin/emqx_ctl status
docker exec emqx3 /opt/emqx/bin/emqx_ctl status
1
2
docker exec emqx1 /opt/emqx/bin/emqx_ctl cluster status
# running_nodes에 emqx1/emqx2/emqx3가 모두 보여야 정상
1
2
docker exec nginx-mqtt tail -f /var/log/nginx/access/stream_access.log
# 요청이 서로 다른 EMQX 백엔드 IP로 분산되는지 확인

EMQX 대시보드 접속 및 초기 계정

아래 URL로 각 노드 대시보드에 접속할 수 있습니다.

1
2
3
EMQX1: http://localhost:8080
EMQX2: http://localhost:8081
EMQX3: http://localhost:8082

EMQX 기본 관리자 계정은 아래 값으로 시작합니다.

1
2
username: admin
password: public

최초 로그인 후 보안을 위해 비밀번호를 즉시 변경하는 것을 권장합니다.

Leave a comment