code-dumps/lambda/deploy-emr-cluster/deploy.py

124 lines
5.0 KiB
Python

import json
import boto3
import os
import logging
print('Loading function')
s3 = boto3.client('s3')
sm = boto3.client('secretsmanager')
emr = boto3.client('emr')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_emr_config():
obj = s3.get_object(
Bucket=os.environ['S3_BUCKET'],
Key=os.environ['S3_OBJECT_KEY']
)
input = json.load(obj['Body'])
return input
def parse_emr_config(emr_config):
kdc_admin_password = get_kdc_admin_password(
emr_config['kerberos_kdc_admin_secret'])
config = {}
config['Name'] = emr_config["name"][:-1]+'2'
config['LogUri'] = emr_config["s3_log_uri"]
config['ReleaseLabel'] = emr_config["release_label"]
config['Instances'] = {}
config['Instances']['KeepJobFlowAliveWhenNoSteps'] = emr_config["keep_job_flow_alive_when_no_steps"]
config['Instances']['TerminationProtected'] = emr_config["termination_protection"]
config['Instances']['TerminationProtected'] = emr_config["termination_protection"]
master = {}
master['Name'] = 'Master'
master['Market'] = 'ON_DEMAND'
master['InstanceRole'] = 'MASTER'
master['InstanceType'] = emr_config["master_instance_type"]
master['InstanceCount'] = emr_config["master_instance_count"]
master['EbsConfiguration'] = {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': emr_config["master_ebs_config_type"],
'SizeInGB': emr_config["master_ebs_config_size"],
},
'VolumesPerInstance': 1
},
],
}
core = {}
core['Name'] = 'Core'
core['Market'] = 'ON_DEMAND'
core['InstanceRole'] = 'CORE'
core['InstanceType'] = emr_config["core_instance_type"]
core['InstanceCount'] = emr_config["core_instance_count"]
core['EbsConfiguration'] = {
'EbsBlockDeviceConfigs': [
{
'VolumeSpecification': {
'VolumeType': emr_config["core_ebs_config_type"],
'SizeInGB': emr_config["core_ebs_config_size"],
},
'VolumesPerInstance': 1
},
],
}
config['Instances']['InstanceGroups'] = [master, core]
config['Instances']['Ec2KeyName'] = emr_config["key_name"]
config['Instances']['Ec2SubnetIds'] = emr_config["subnet_ids"]
config['Instances']['EmrManagedMasterSecurityGroup'] = emr_config["master_security_group"]
config['Instances']['EmrManagedSlaveSecurityGroup'] = emr_config["slave_security_group"]
config['Instances']['ServiceAccessSecurityGroup'] = emr_config["service_security_group"]
config['Instances']['AdditionalMasterSecurityGroups'] = emr_config["additional_master_security_group"].split(",")
config['Instances']['AdditionalSlaveSecurityGroups'] = emr_config["additional_master_security_group"].split(",")
config['Tags'] = [{'Key': key, 'Value': value} for key, value in emr_config["tags"].items() if key != 'Terraform']
config['Tags'].append({'Key': 'emr-failover', 'Value': 'true'})
config['Tags'].append({'Key': 'Name', 'Value': config['Name']})
config['BootstrapActions'] = [
{
'Name': emr_config["bootstrap_action_name"],
'ScriptBootstrapAction': {
'Path': emr_config["bootstrap_action_path"],
}
}
]
config['Applications'] = [{'Name': name}
for name in emr_config["applications"]]
config['Configurations'] = json.loads(emr_config["configurations_json"])
config['SecurityConfiguration'] = emr_config["security_configuration_name"]
config['EbsRootVolumeSize'] = emr_config["ebs_root_volume_size"]
config['CustomAmiId'] = emr_config["custom_ami_id"]
if emr_config['kerberos_realm'] is not None and kdc_admin_password is not None:
config['KerberosAttributes'] = {}
config['KerberosAttributes']['Realm'] = emr_config['kerberos_realm']
config['KerberosAttributes']['KdcAdminPassword'] = kdc_admin_password
config['StepConcurrencyLevel'] = emr_config["step_concurrency_level"]
config['VisibleToAllUsers'] = emr_config["visible_to_all_users"]
config['ServiceRole'] = emr_config["iam_service_role"]
config['JobFlowRole'] = emr_config["instance_profile"]
if emr_config["log_encryption_kms_key_id"] is not None:
config['LogEncryptionKmsKeyId'] = emr_config["log_encryption_kms_key_id"]
return config
def get_kdc_admin_password(arn):
if arn is None:
return None
return sm.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")['SecretString']
def lambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
emr_config = get_emr_config()
message = event['Records'][0]['Sns']['Message']
logger.info("From SNS: " + message)
config = parse_emr_config(emr_config)
response = emr.run_job_flow(**config)
logger.info("Creating failover cluster")
logger.info(json.dumps(response, indent=2))
return response