프로젝트 목표
최신 논문 데이터를 크롤링하여 Elasicsearch에 저장하고, Airflow DAG를 통해 주기적으로 데이터를 업데이트 하는 시스템을 구축한다.
크롤링한 데이터를 기반으로 검색 및 필터링 기능을 제공한다.
개발 환경
Apache Airflow 2.7.3
fastapi
elasticsearch
docker
Docker Desktop
Dockerfile (Airflow)
FROM apache/airflow:2.7.3
USER root
WORKDIR /opt/airflow
RUN apt update && apt install -y procps sudo
USER airflow
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# Airflow 초기화
CMD ["bash", "-c", "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.com && airflow scheduler & airflow webserver"]
불필요한 의존성을 줄이고 공식 지원되는 환경을 활용하기 위해 apache/airflow:2.7.3을 기반으로 컨테이너를 생성한다.
오류 해결을 위해 ps 명령어를 사용해야하는데, 이를 설치하기 위해서는 procps 패키지를 설치해주어야한다.
이 때, root 권한으로만 패키지 설정이 가능하기 때문에 USER root에서 패키지 설치를 진행했다.
USER airflow로 변경 후 파이썬 패키지를 설치하고, Airflow 데이터베이스 초기화 및 사용자를 생성하면 커맨드라인을 추가했다.
Dockerfile (FastAPI)
FROM python:3.9
WORKDIR /app
# 필요한 패키지 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 애플리케이션 코드 복사
COPY . .
# FastAPI 실행
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
Airflow DAG
Airflow DAG 파일을 작성하기 위해서는 내가 가져올 논문의 응답 형태를 확인해야한다.
import requests
url = "https://api.biorxiv.org/covid19/0"
response = requests.get(url)
papers = response.json()
print(papers)
API 응답 결과를 구조화한 형태는 다음 표와 같다.
| 필드명 | 설명 | 예시값 |
| rel_doi | DOI (논문의 고유 식별자) | 10.1101/2025.02.02.24318641 |
| rel_title | 논문 제목 | Molecular and Immunological Signatures of Long COVID |
| rel_date | 출판 날짜 | 2025-02-04 |
| rel_site | 출처 (저널/프리프린트 서버) | medRxiv |
| rel_link | 논문 URL | https://medrxiv.org/cgi/content/short/2025.02.02.24318641 |
| rel_abs | 논문 초록 (Abstract) | The COVID-19 pandemic, caused by the SARS-CoV-2 virus, has led to an emerging health challenge known as long COVID... |
| rel_num_authors | 저자 수 | 2 |
| rel_authors | 저자 정보 (이름, 소속) | [{"author_name": "Jorine KN Hammink", "author_inst": "JorClinic BV"}, {"author_name": "Tim R van Elst", "author_inst": "Parasite Clinic"}] |
| version | 논문 버전 정보 | 1 |
| license | 라이선스 | cc_by_nd |
| type | 출판 상태 | PUBLISHAHEADOFPRINT |
| category | 논문 카테고리 | infectious diseases |
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
import requests
# Elasticsearch 연결하기
es = Elasticsearch("http://elasticsearch:9200")
def create_index_if_not_exists():
"""Elasticsearch 인덱스가 없으면 생성"""
index_name = "research_papers"
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body={
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"title": { "type": "text" },
"abstract": { "type": "text" },
"authors": { "type": "nested", "properties": {
"name": { "type": "text" },
"institution": { "type": "text" }
}},
"publication_date": { "type": "date" },
"doi": { "type": "keyword" },
"category": { "type": "text" },
"source": { "type": "keyword" },
"url": { "type": "keyword" }
}
}
})
print(f"Index '{index_name}' has been created.")
def fetch_papers(**kwargs):
"""API를 이용해 논문 데이터 가져오기"""
url = "https://api.biorxiv.org/covid19/0"
response = requests.get(url)
# API 응답 상태 확인
if response.status_code != 200:
print(f"API 요청 실패: {response.status_code}")
return []
papers = response.json().get("collection", [])
# XCom (Cross-Commuinication)을 이용해 데이터 저장하기
kwargs['ti'].xcom_push(key="papers", value=papers)
print(f"가져온 논문 데이터 수: {len(papers)}")
def index_papers(**kwargs):
"""Elasticsearch에 논문 색인하기"""
ti = kwargs['ti']
papers = ti.xcom_pull(task_ids="fetch_papers", key="papers")
if not papers:
print(f"색인할 논문 데이터가 존재하지 않습니다.")
return
for paper in papers:
authors_list = [{
"name": author.get("author_name", "Unknown"),
"institution": author.get("author_inst", "Unknown")
} for author in paper.get("rel_authors", [])]
doc = {
"title": paper.get("rel_title", "No Title"),
"abstract": paper.get("rel_abs", "No Abstract"),
"authors": authors_list,
"publication_date": paper.get("rel_date", "1970-01-01"),
"doi": paper.get("rel_doi", ""),
"category": paper.get("category", "Unknown"),
"source": paper.get("rel_site", "Unknown"),
"url": paper.get("rel_link", "")
}
es.index(index="research_papers", body=doc)
# DAG 설정
default_args = {
"owner": "airflow",
"start_date": datetime(2025, 2, 4),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"paper_etl",
default_args=default_args,
schedule_interval="0 6 * * *", # 매일 오전 6시에 실행하도록 설정
catchup=False
)
init_index_task = PythonOperator(
task_id="init_es_index",
python_callable=create_index_if_not_exists,
dag=dag,
)
fetch_task = PythonOperator(
task_id="fetch_papers",
python_callable=fetch_papers,
provide_context=True,
dag=dag,
)
index_task = PythonOperator(
task_id="index_papers",
python_callable=index_papers,
provide_context=True,
dag=dag,
)
init_index_task >> fetch_task >> index_task
- create_index_if_not_exists()
Elasticsearch의 인덱스를 생성하는 기능 정의
- fetch_papers()
bioRxiv API에서 논문 데이터를 크롤링하는 기능 정의
가져온 논문 데이터를 Airflow의 XCom을 사용해 다음 태스크에 전달하는 역할
- index_papers()
fetch_papers 태스크에서 전달된 데이터를 가져와 Elasticsearch의 index() 메소드를 사용해 논문 데이터를 색인하는 역할
FastAPI app.py
FastAPI를 사용해 Elasticsearch에서 논문을 검색하는 REST API를 구축한다.
from fastapi import FastAPI
from elasticsearch import Elasticsearch
app = FastAPI()
es = Elasticsearch("http://elasticsearch:9200")
@app.get("/search")
async def search_papers(query: str):
"""Elasticsearch에서 논문 검색 수행"""
body = {
"query": {
"multi_match": {
"query": query,
"fields": ["title", "abstract", "author"]
}
}
}
response = es.search(index="research_papers", body=body)
return {"results": response["hits"]["hits"]}
@app.get("/")
def read_root():
return {"message": "Hello, FastAPI!"}
GET /search?query=논문제목 방식으로 요청을 보낼 수 있으며, multi_match를 사용해 제목, 초록, 저자 필드에서 query ㅁ문자열을 검색한다.
docker-compose.yml
version: '3.7'
services:
# PostgreSQL (Airflow metadata DB)
postgres:
image: postgres:13
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5432:5432"
networks:
- elastic_network
volumes:
- postgres_data:/var/lib/postgresql/data
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
- "9300:9300"
networks:
- elastic_network
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
- elastic_network
# FastAPI
fastapi:
build: ../fastapi
container_name: fastapi
ports:
- "8000:8000"
depends_on:
- elasticsearch
networks:
- elastic_network
# Airflow
airflow:
build: ../airflow
container_name: airflow
restart: always
ports:
- "8080:8080"
depends_on:
- elasticsearch
- postgres
environment:
- AIRFLOW_HOME=/opt/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__LOAD_EXAMPLES=False
volumes:
- airflow_db:/opt/airflow
- ../airflow/dags:/opt/airflow/dags # DAG 폴더 마운트
networks:
- elastic_network
networks:
elastic_network:
driver: bridge
volumes:
postgres_data:
airflow_db:
Airflow UI 접속 및 DAG 실행
docker-compose.yml 파일을 작성 한 뒤 컨테이너를 빌드하고 띄워주자.
docker-compose up --build -d
localhost:8080에 접속하면 Airflow 로그인 UI가 나온다.
나는 Dockerfile에서 이미 사용자 계정을 생성한 상태이므로, 해당 계정으로 로그인을 해주면 된다.

연동이 잘 되었다면 내가 작성한 paper_etl이라는 DAG를 확인할 수 있다.
(success, failed가 여러 개인 이유는 다소 삽질을 했기 때문, 구체적인 트러블 슈팅 과정은 다른 게시물에 별도로 작성하도록 하겠다..)
Run 버튼을 클릭하면 DAG를 수동으로 실행시킬 수 있으며, 각 태스크가 성공적으로 수행되었는지, 실패되었는지 확인할 수 있다.
로그를 통해 구체적인 에러 원인을 잡을 수도 있다.

inin_es_index >> fetch_papers >> index_papers 태스크가 모두 잘 수행된 것을 확인할 수 있다.
그러면 요청을 보내서 어떤 데이터가 저장되었는지 확인해보자.
curl -X GET "http://localhost:9200/research_papers/_search?size=5&sort=publication_date:desc&pretty=true"

논문 데이터가 내가 원하는 구조로 잘 저장되어 있는 것을 확인할 수 있다!
다음 포스팅에서는 이후 개발 과정을 정리할 예정이다.
'Data Engineering > Elasticsearch' 카테고리의 다른 글
| [Opensearch] Opensearch 개념 정리 (0) | 2025.02.11 |
|---|---|
| [Elasticsearch] 논문 검색 엔진 구현 프로젝트 (Elasticsearch + Airflow + FastAPI) (4) (0) | 2025.02.06 |
| [Elasticsearch] 논문 검색 엔진 구현 프로젝트 (Elasticsearch + Airflow + FastAPI) (3) (0) | 2025.02.05 |
| [Elasticsearch] 논문 검색 엔진 구현 프로젝트 (Elasticsearch + Airflow + FastAPI) (2) (0) | 2025.02.05 |
| [Elasticsearch] Elasticsearch 기본 개념 (0) | 2025.02.04 |