import json import boto3 import os import logging print('Loading function') s3 = boto3.client('s3') sm = boto3.client('secretsmanager') emr = boto3.client('emr') logger = logging.getLogger() logger.setLevel(logging.INFO) def get_emr_config(): obj = s3.get_object( Bucket=os.environ['S3_BUCKET'], Key=os.environ['S3_OBJECT_KEY'] ) input = json.load(obj['Body']) return input def parse_emr_config(emr_config): kdc_admin_password = get_kdc_admin_password( emr_config['kerberos_kdc_admin_secret']) config = {} config['Name'] = emr_config["name"][:-1]+'2' config['LogUri'] = emr_config["s3_log_uri"] config['ReleaseLabel'] = emr_config["release_label"] config['Instances'] = {} config['Instances']['KeepJobFlowAliveWhenNoSteps'] = emr_config["keep_job_flow_alive_when_no_steps"] config['Instances']['TerminationProtected'] = emr_config["termination_protection"] config['Instances']['TerminationProtected'] = emr_config["termination_protection"] master = {} master['Name'] = 'Master' master['Market'] = 'ON_DEMAND' master['InstanceRole'] = 'MASTER' master['InstanceType'] = emr_config["master_instance_type"] master['InstanceCount'] = emr_config["master_instance_count"] master['EbsConfiguration'] = { 'EbsBlockDeviceConfigs': [ { 'VolumeSpecification': { 'VolumeType': emr_config["master_ebs_config_type"], 'SizeInGB': emr_config["master_ebs_config_size"], }, 'VolumesPerInstance': 1 }, ], } core = {} core['Name'] = 'Core' core['Market'] = 'ON_DEMAND' core['InstanceRole'] = 'CORE' core['InstanceType'] = emr_config["core_instance_type"] core['InstanceCount'] = emr_config["core_instance_count"] core['EbsConfiguration'] = { 'EbsBlockDeviceConfigs': [ { 'VolumeSpecification': { 'VolumeType': emr_config["core_ebs_config_type"], 'SizeInGB': emr_config["core_ebs_config_size"], }, 'VolumesPerInstance': 1 }, ], } config['Instances']['InstanceGroups'] = [master, core] config['Instances']['Ec2KeyName'] = emr_config["key_name"] config['Instances']['Ec2SubnetIds'] = emr_config["subnet_ids"] config['Instances']['EmrManagedMasterSecurityGroup'] = emr_config["master_security_group"] config['Instances']['EmrManagedSlaveSecurityGroup'] = emr_config["slave_security_group"] config['Instances']['ServiceAccessSecurityGroup'] = emr_config["service_security_group"] config['Instances']['AdditionalMasterSecurityGroups'] = emr_config["additional_master_security_group"].split(",") config['Instances']['AdditionalSlaveSecurityGroups'] = emr_config["additional_master_security_group"].split(",") config['Tags'] = [{'Key': key, 'Value': value} for key, value in emr_config["tags"].items() if key != 'Terraform'] config['Tags'].append({'Key': 'emr-failover', 'Value': 'true'}) config['Tags'].append({'Key': 'Name', 'Value': config['Name']}) config['BootstrapActions'] = [ { 'Name': emr_config["bootstrap_action_name"], 'ScriptBootstrapAction': { 'Path': emr_config["bootstrap_action_path"], } } ] config['Applications'] = [{'Name': name} for name in emr_config["applications"]] config['Configurations'] = json.loads(emr_config["configurations_json"]) config['SecurityConfiguration'] = emr_config["security_configuration_name"] config['EbsRootVolumeSize'] = emr_config["ebs_root_volume_size"] config['CustomAmiId'] = emr_config["custom_ami_id"] if emr_config['kerberos_realm'] is not None and kdc_admin_password is not None: config['KerberosAttributes'] = {} config['KerberosAttributes']['Realm'] = emr_config['kerberos_realm'] config['KerberosAttributes']['KdcAdminPassword'] = kdc_admin_password config['StepConcurrencyLevel'] = emr_config["step_concurrency_level"] config['VisibleToAllUsers'] = emr_config["visible_to_all_users"] config['ServiceRole'] = emr_config["iam_service_role"] config['JobFlowRole'] = emr_config["instance_profile"] if emr_config["log_encryption_kms_key_id"] is not None: config['LogEncryptionKmsKeyId'] = emr_config["log_encryption_kms_key_id"] return config def get_kdc_admin_password(arn): if arn is None: return None return sm.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")['SecretString'] def lambda_handler(event, context): # print("Received event: " + json.dumps(event, indent=2)) emr_config = get_emr_config() message = event['Records'][0]['Sns']['Message'] logger.info("From SNS: " + message) config = parse_emr_config(emr_config) response = emr.run_job_flow(**config) logger.info("Creating failover cluster") logger.info(json.dumps(response, indent=2)) return response