From 0ac84103a3f8ed94aa212e605198675e9a4b2b0b Mon Sep 17 00:00:00 2001 From: x p k Date: Thu, 1 Feb 2024 15:37:59 +0800 Subject: [PATCH] NEW: lambda function for deploying EMR cluster --- lambda/deploy-emr-cluster/README.md | 5 ++ lambda/deploy-emr-cluster/deploy.py | 123 ++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 lambda/deploy-emr-cluster/README.md create mode 100644 lambda/deploy-emr-cluster/deploy.py diff --git a/lambda/deploy-emr-cluster/README.md b/lambda/deploy-emr-cluster/README.md new file mode 100644 index 0000000..53f7b5f --- /dev/null +++ b/lambda/deploy-emr-cluster/README.md @@ -0,0 +1,5 @@ +Useful for use with cloudwatch alarm and redeploying EMR cluster should the cluster +nodes fail. For instance zonal failure. + +This lambda function was developed by IBM. + diff --git a/lambda/deploy-emr-cluster/deploy.py b/lambda/deploy-emr-cluster/deploy.py new file mode 100644 index 0000000..b1f35ed --- /dev/null +++ b/lambda/deploy-emr-cluster/deploy.py @@ -0,0 +1,123 @@ +import json +import boto3 +import os +import logging + + +print('Loading function') +s3 = boto3.client('s3') +sm = boto3.client('secretsmanager') +emr = boto3.client('emr') +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def get_emr_config(): + obj = s3.get_object( + Bucket=os.environ['S3_BUCKET'], + Key=os.environ['S3_OBJECT_KEY'] + ) + input = json.load(obj['Body']) + return input + + +def parse_emr_config(emr_config): + kdc_admin_password = get_kdc_admin_password( + emr_config['kerberos_kdc_admin_secret']) + config = {} + config['Name'] = emr_config["name"][:-1]+'2' + config['LogUri'] = emr_config["s3_log_uri"] + config['ReleaseLabel'] = emr_config["release_label"] + config['Instances'] = {} + config['Instances']['KeepJobFlowAliveWhenNoSteps'] = emr_config["keep_job_flow_alive_when_no_steps"] + config['Instances']['TerminationProtected'] = emr_config["termination_protection"] + config['Instances']['TerminationProtected'] = emr_config["termination_protection"] + master = {} + master['Name'] = 'Master' + master['Market'] = 'ON_DEMAND' + master['InstanceRole'] = 'MASTER' + master['InstanceType'] = emr_config["master_instance_type"] + master['InstanceCount'] = emr_config["master_instance_count"] + master['EbsConfiguration'] = { + 'EbsBlockDeviceConfigs': [ + { + 'VolumeSpecification': { + 'VolumeType': emr_config["master_ebs_config_type"], + 'SizeInGB': emr_config["master_ebs_config_size"], + }, + 'VolumesPerInstance': 1 + }, + ], + } + core = {} + core['Name'] = 'Core' + core['Market'] = 'ON_DEMAND' + core['InstanceRole'] = 'CORE' + core['InstanceType'] = emr_config["core_instance_type"] + core['InstanceCount'] = emr_config["core_instance_count"] + core['EbsConfiguration'] = { + 'EbsBlockDeviceConfigs': [ + { + 'VolumeSpecification': { + 'VolumeType': emr_config["core_ebs_config_type"], + 'SizeInGB': emr_config["core_ebs_config_size"], + }, + 'VolumesPerInstance': 1 + }, + ], + } + config['Instances']['InstanceGroups'] = [master, core] + config['Instances']['Ec2KeyName'] = emr_config["key_name"] + config['Instances']['Ec2SubnetIds'] = emr_config["subnet_ids"] + config['Instances']['EmrManagedMasterSecurityGroup'] = emr_config["master_security_group"] + config['Instances']['EmrManagedSlaveSecurityGroup'] = emr_config["slave_security_group"] + config['Instances']['ServiceAccessSecurityGroup'] = emr_config["service_security_group"] + config['Instances']['AdditionalMasterSecurityGroups'] = emr_config["additional_master_security_group"].split(",") + config['Instances']['AdditionalSlaveSecurityGroups'] = emr_config["additional_master_security_group"].split(",") + config['Tags'] = [{'Key': key, 'Value': value} for key, value in emr_config["tags"].items() if key != 'Terraform'] + config['Tags'].append({'Key': 'emr-failover', 'Value': 'true'}) + config['Tags'].append({'Key': 'Name', 'Value': config['Name']}) + config['BootstrapActions'] = [ + { + 'Name': emr_config["bootstrap_action_name"], + 'ScriptBootstrapAction': { + 'Path': emr_config["bootstrap_action_path"], + } + } + ] + config['Applications'] = [{'Name': name} + for name in emr_config["applications"]] + config['Configurations'] = json.loads(emr_config["configurations_json"]) + config['SecurityConfiguration'] = emr_config["security_configuration_name"] + config['EbsRootVolumeSize'] = emr_config["ebs_root_volume_size"] + config['CustomAmiId'] = emr_config["custom_ami_id"] + if emr_config['kerberos_realm'] is not None and kdc_admin_password is not None: + config['KerberosAttributes'] = {} + config['KerberosAttributes']['Realm'] = emr_config['kerberos_realm'] + config['KerberosAttributes']['KdcAdminPassword'] = kdc_admin_password + config['StepConcurrencyLevel'] = emr_config["step_concurrency_level"] + config['VisibleToAllUsers'] = emr_config["visible_to_all_users"] + config['ServiceRole'] = emr_config["iam_service_role"] + config['JobFlowRole'] = emr_config["instance_profile"] + if emr_config["log_encryption_kms_key_id"] is not None: + config['LogEncryptionKmsKeyId'] = emr_config["log_encryption_kms_key_id"] + return config + + +def get_kdc_admin_password(arn): + if arn is None: + return None + return sm.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")['SecretString'] + + +def lambda_handler(event, context): + # print("Received event: " + json.dumps(event, indent=2)) + emr_config = get_emr_config() + message = event['Records'][0]['Sns']['Message'] + logger.info("From SNS: " + message) + config = parse_emr_config(emr_config) + response = emr.run_job_flow(**config) + logger.info("Creating failover cluster") + logger.info(json.dumps(response, indent=2)) + return response +