124 lines
5.0 KiB
Python
124 lines
5.0 KiB
Python
import json
|
|
import boto3
|
|
import os
|
|
import logging
|
|
|
|
|
|
print('Loading function')
|
|
s3 = boto3.client('s3')
|
|
sm = boto3.client('secretsmanager')
|
|
emr = boto3.client('emr')
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
def get_emr_config():
|
|
obj = s3.get_object(
|
|
Bucket=os.environ['S3_BUCKET'],
|
|
Key=os.environ['S3_OBJECT_KEY']
|
|
)
|
|
input = json.load(obj['Body'])
|
|
return input
|
|
|
|
|
|
def parse_emr_config(emr_config):
|
|
kdc_admin_password = get_kdc_admin_password(
|
|
emr_config['kerberos_kdc_admin_secret'])
|
|
config = {}
|
|
config['Name'] = emr_config["name"][:-1]+'2'
|
|
config['LogUri'] = emr_config["s3_log_uri"]
|
|
config['ReleaseLabel'] = emr_config["release_label"]
|
|
config['Instances'] = {}
|
|
config['Instances']['KeepJobFlowAliveWhenNoSteps'] = emr_config["keep_job_flow_alive_when_no_steps"]
|
|
config['Instances']['TerminationProtected'] = emr_config["termination_protection"]
|
|
config['Instances']['TerminationProtected'] = emr_config["termination_protection"]
|
|
master = {}
|
|
master['Name'] = 'Master'
|
|
master['Market'] = 'ON_DEMAND'
|
|
master['InstanceRole'] = 'MASTER'
|
|
master['InstanceType'] = emr_config["master_instance_type"]
|
|
master['InstanceCount'] = emr_config["master_instance_count"]
|
|
master['EbsConfiguration'] = {
|
|
'EbsBlockDeviceConfigs': [
|
|
{
|
|
'VolumeSpecification': {
|
|
'VolumeType': emr_config["master_ebs_config_type"],
|
|
'SizeInGB': emr_config["master_ebs_config_size"],
|
|
},
|
|
'VolumesPerInstance': 1
|
|
},
|
|
],
|
|
}
|
|
core = {}
|
|
core['Name'] = 'Core'
|
|
core['Market'] = 'ON_DEMAND'
|
|
core['InstanceRole'] = 'CORE'
|
|
core['InstanceType'] = emr_config["core_instance_type"]
|
|
core['InstanceCount'] = emr_config["core_instance_count"]
|
|
core['EbsConfiguration'] = {
|
|
'EbsBlockDeviceConfigs': [
|
|
{
|
|
'VolumeSpecification': {
|
|
'VolumeType': emr_config["core_ebs_config_type"],
|
|
'SizeInGB': emr_config["core_ebs_config_size"],
|
|
},
|
|
'VolumesPerInstance': 1
|
|
},
|
|
],
|
|
}
|
|
config['Instances']['InstanceGroups'] = [master, core]
|
|
config['Instances']['Ec2KeyName'] = emr_config["key_name"]
|
|
config['Instances']['Ec2SubnetIds'] = emr_config["subnet_ids"]
|
|
config['Instances']['EmrManagedMasterSecurityGroup'] = emr_config["master_security_group"]
|
|
config['Instances']['EmrManagedSlaveSecurityGroup'] = emr_config["slave_security_group"]
|
|
config['Instances']['ServiceAccessSecurityGroup'] = emr_config["service_security_group"]
|
|
config['Instances']['AdditionalMasterSecurityGroups'] = emr_config["additional_master_security_group"].split(",")
|
|
config['Instances']['AdditionalSlaveSecurityGroups'] = emr_config["additional_master_security_group"].split(",")
|
|
config['Tags'] = [{'Key': key, 'Value': value} for key, value in emr_config["tags"].items() if key != 'Terraform']
|
|
config['Tags'].append({'Key': 'emr-failover', 'Value': 'true'})
|
|
config['Tags'].append({'Key': 'Name', 'Value': config['Name']})
|
|
config['BootstrapActions'] = [
|
|
{
|
|
'Name': emr_config["bootstrap_action_name"],
|
|
'ScriptBootstrapAction': {
|
|
'Path': emr_config["bootstrap_action_path"],
|
|
}
|
|
}
|
|
]
|
|
config['Applications'] = [{'Name': name}
|
|
for name in emr_config["applications"]]
|
|
config['Configurations'] = json.loads(emr_config["configurations_json"])
|
|
config['SecurityConfiguration'] = emr_config["security_configuration_name"]
|
|
config['EbsRootVolumeSize'] = emr_config["ebs_root_volume_size"]
|
|
config['CustomAmiId'] = emr_config["custom_ami_id"]
|
|
if emr_config['kerberos_realm'] is not None and kdc_admin_password is not None:
|
|
config['KerberosAttributes'] = {}
|
|
config['KerberosAttributes']['Realm'] = emr_config['kerberos_realm']
|
|
config['KerberosAttributes']['KdcAdminPassword'] = kdc_admin_password
|
|
config['StepConcurrencyLevel'] = emr_config["step_concurrency_level"]
|
|
config['VisibleToAllUsers'] = emr_config["visible_to_all_users"]
|
|
config['ServiceRole'] = emr_config["iam_service_role"]
|
|
config['JobFlowRole'] = emr_config["instance_profile"]
|
|
if emr_config["log_encryption_kms_key_id"] is not None:
|
|
config['LogEncryptionKmsKeyId'] = emr_config["log_encryption_kms_key_id"]
|
|
return config
|
|
|
|
|
|
def get_kdc_admin_password(arn):
|
|
if arn is None:
|
|
return None
|
|
return sm.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")['SecretString']
|
|
|
|
|
|
def lambda_handler(event, context):
|
|
# print("Received event: " + json.dumps(event, indent=2))
|
|
emr_config = get_emr_config()
|
|
message = event['Records'][0]['Sns']['Message']
|
|
logger.info("From SNS: " + message)
|
|
config = parse_emr_config(emr_config)
|
|
response = emr.run_job_flow(**config)
|
|
logger.info("Creating failover cluster")
|
|
logger.info(json.dumps(response, indent=2))
|
|
return response
|
|
|