NEW: lambda function for deploying EMR cluster
This commit is contained in:
parent
9dd4bc0b4e
commit
0ac84103a3
5
lambda/deploy-emr-cluster/README.md
Normal file
5
lambda/deploy-emr-cluster/README.md
Normal file
@ -0,0 +1,5 @@
|
||||
Useful for use with cloudwatch alarm and redeploying EMR cluster should the cluster
|
||||
nodes fail. For instance zonal failure.
|
||||
|
||||
This lambda function was developed by IBM.
|
||||
|
123
lambda/deploy-emr-cluster/deploy.py
Normal file
123
lambda/deploy-emr-cluster/deploy.py
Normal file
@ -0,0 +1,123 @@
|
||||
import json
|
||||
import boto3
|
||||
import os
|
||||
import logging
|
||||
|
||||
|
||||
print('Loading function')
|
||||
s3 = boto3.client('s3')
|
||||
sm = boto3.client('secretsmanager')
|
||||
emr = boto3.client('emr')
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
def get_emr_config():
|
||||
obj = s3.get_object(
|
||||
Bucket=os.environ['S3_BUCKET'],
|
||||
Key=os.environ['S3_OBJECT_KEY']
|
||||
)
|
||||
input = json.load(obj['Body'])
|
||||
return input
|
||||
|
||||
|
||||
def parse_emr_config(emr_config):
|
||||
kdc_admin_password = get_kdc_admin_password(
|
||||
emr_config['kerberos_kdc_admin_secret'])
|
||||
config = {}
|
||||
config['Name'] = emr_config["name"][:-1]+'2'
|
||||
config['LogUri'] = emr_config["s3_log_uri"]
|
||||
config['ReleaseLabel'] = emr_config["release_label"]
|
||||
config['Instances'] = {}
|
||||
config['Instances']['KeepJobFlowAliveWhenNoSteps'] = emr_config["keep_job_flow_alive_when_no_steps"]
|
||||
config['Instances']['TerminationProtected'] = emr_config["termination_protection"]
|
||||
config['Instances']['TerminationProtected'] = emr_config["termination_protection"]
|
||||
master = {}
|
||||
master['Name'] = 'Master'
|
||||
master['Market'] = 'ON_DEMAND'
|
||||
master['InstanceRole'] = 'MASTER'
|
||||
master['InstanceType'] = emr_config["master_instance_type"]
|
||||
master['InstanceCount'] = emr_config["master_instance_count"]
|
||||
master['EbsConfiguration'] = {
|
||||
'EbsBlockDeviceConfigs': [
|
||||
{
|
||||
'VolumeSpecification': {
|
||||
'VolumeType': emr_config["master_ebs_config_type"],
|
||||
'SizeInGB': emr_config["master_ebs_config_size"],
|
||||
},
|
||||
'VolumesPerInstance': 1
|
||||
},
|
||||
],
|
||||
}
|
||||
core = {}
|
||||
core['Name'] = 'Core'
|
||||
core['Market'] = 'ON_DEMAND'
|
||||
core['InstanceRole'] = 'CORE'
|
||||
core['InstanceType'] = emr_config["core_instance_type"]
|
||||
core['InstanceCount'] = emr_config["core_instance_count"]
|
||||
core['EbsConfiguration'] = {
|
||||
'EbsBlockDeviceConfigs': [
|
||||
{
|
||||
'VolumeSpecification': {
|
||||
'VolumeType': emr_config["core_ebs_config_type"],
|
||||
'SizeInGB': emr_config["core_ebs_config_size"],
|
||||
},
|
||||
'VolumesPerInstance': 1
|
||||
},
|
||||
],
|
||||
}
|
||||
config['Instances']['InstanceGroups'] = [master, core]
|
||||
config['Instances']['Ec2KeyName'] = emr_config["key_name"]
|
||||
config['Instances']['Ec2SubnetIds'] = emr_config["subnet_ids"]
|
||||
config['Instances']['EmrManagedMasterSecurityGroup'] = emr_config["master_security_group"]
|
||||
config['Instances']['EmrManagedSlaveSecurityGroup'] = emr_config["slave_security_group"]
|
||||
config['Instances']['ServiceAccessSecurityGroup'] = emr_config["service_security_group"]
|
||||
config['Instances']['AdditionalMasterSecurityGroups'] = emr_config["additional_master_security_group"].split(",")
|
||||
config['Instances']['AdditionalSlaveSecurityGroups'] = emr_config["additional_master_security_group"].split(",")
|
||||
config['Tags'] = [{'Key': key, 'Value': value} for key, value in emr_config["tags"].items() if key != 'Terraform']
|
||||
config['Tags'].append({'Key': 'emr-failover', 'Value': 'true'})
|
||||
config['Tags'].append({'Key': 'Name', 'Value': config['Name']})
|
||||
config['BootstrapActions'] = [
|
||||
{
|
||||
'Name': emr_config["bootstrap_action_name"],
|
||||
'ScriptBootstrapAction': {
|
||||
'Path': emr_config["bootstrap_action_path"],
|
||||
}
|
||||
}
|
||||
]
|
||||
config['Applications'] = [{'Name': name}
|
||||
for name in emr_config["applications"]]
|
||||
config['Configurations'] = json.loads(emr_config["configurations_json"])
|
||||
config['SecurityConfiguration'] = emr_config["security_configuration_name"]
|
||||
config['EbsRootVolumeSize'] = emr_config["ebs_root_volume_size"]
|
||||
config['CustomAmiId'] = emr_config["custom_ami_id"]
|
||||
if emr_config['kerberos_realm'] is not None and kdc_admin_password is not None:
|
||||
config['KerberosAttributes'] = {}
|
||||
config['KerberosAttributes']['Realm'] = emr_config['kerberos_realm']
|
||||
config['KerberosAttributes']['KdcAdminPassword'] = kdc_admin_password
|
||||
config['StepConcurrencyLevel'] = emr_config["step_concurrency_level"]
|
||||
config['VisibleToAllUsers'] = emr_config["visible_to_all_users"]
|
||||
config['ServiceRole'] = emr_config["iam_service_role"]
|
||||
config['JobFlowRole'] = emr_config["instance_profile"]
|
||||
if emr_config["log_encryption_kms_key_id"] is not None:
|
||||
config['LogEncryptionKmsKeyId'] = emr_config["log_encryption_kms_key_id"]
|
||||
return config
|
||||
|
||||
|
||||
def get_kdc_admin_password(arn):
|
||||
if arn is None:
|
||||
return None
|
||||
return sm.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")['SecretString']
|
||||
|
||||
|
||||
def lambda_handler(event, context):
|
||||
# print("Received event: " + json.dumps(event, indent=2))
|
||||
emr_config = get_emr_config()
|
||||
message = event['Records'][0]['Sns']['Message']
|
||||
logger.info("From SNS: " + message)
|
||||
config = parse_emr_config(emr_config)
|
||||
response = emr.run_job_flow(**config)
|
||||
logger.info("Creating failover cluster")
|
||||
logger.info(json.dumps(response, indent=2))
|
||||
return response
|
||||
|
Loading…
Reference in New Issue
Block a user