Data Engineering/Spark

[Linux/Spark] PySpark와 MySQL을 연동해보자

seoraroong 2024. 8. 22. 00:00

홈 디렉토리에 위치해있던 mysql-connector jar 파일을 spark 디렉토리로 이동시켰다.

 

// mySQL과 PySpark 연동을 위해 세션 생성하기
mysql_spark = SparkSession.builder.config('spark.jar','mysql-connector=java-5.1.46.jar')\
.master('local').appName('pySpark_MySql').getOrCreate()

 

// 1. 생성한 새로운 세션으로 데이터프레임 가져오기
df = (
    mysql_spark.read.format('jdbc')\
    .option('url','jdbc:mysql://localhost:3306/epldatabase')
    .option('driver','com.mysql.jdbc.Driver')
    .option('dbtable', 'sparktest')
    .option('user','root').option('password','password')
    .load()
)

 

// 2. 새로운 세션을 생성하지 않고 데이터프레임 가져오기
// 1번 방식보다 2번 방식을 이용할 것을 권장 !!
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/epldatabase?useSSL=false"
tablename = "sparktest"
user = 'root'
password = 'password'
dbDataFrame = spark.read.format("jdbc")\
    .option("url", url)\
    .option("dbtable", tablename)\
    .option("driver",  driver)\
    .option("user",user)\
    .option('password',password)\
    .load()

 

// 필터링 조건을 만들어서 MySQL에 있는 테이블(데이터프레임) 데이터 읽어오기

// 필터링 조건을 predicates 라는 변수에 저장
predicates = [
    "name != 'a' and major != 'math'"
]

// JDBC 연결에 필요한 속성 정의 (드라이버, 인증 정보)
props = {'driver':driver, 'user':user, 'password':password}

// JDBC를 통해 MySQL 데이터베이스에서 데이터 읽어오기
spark.read.jdbc(url, tablename, predicates=predicates, properties=props)

 

// 위의 과정 일부분을 함수로 만들어 간단하게 적용하기
def elements(url, tablename, predicates, properties):
    print(url, tablename, predicates, properties)

params = {
    'url' : url,
    'table' : tablename,
    'predicates' : predicates,
    'properties' : props
}

spark.read.jdbc(**params).show()