飞道的博客

大数据基础-scala作为值的函数,匿名函数,闭包,柯里化,隐式转换和隐式参数,Akka并发框架,模拟简易版的spark通信

366人阅读  评论(0)

高阶函数

1.值的函数

(1)使用场景

函数可以向数字,字符串一样,可以将函数传递给一个方法

(2)示例

package com.day04

object FuncDemo {
  def main(args: Array[String]): Unit = {
    //1.创建函数,将数字转换为小星星
    val function: Int => String = (num:Int) => "*" * num
    //2.创建列表,执行转换
    val resultList = (1 to 10).map(function)
    //3.打印测试
    println(resultList)
  }
}

2.匿名函数

不给函数赋值给变量,没有赋值给变量的函数就是匿名函数

package com.day04

object FuncDemo {
  def main(args: Array[String]): Unit = {
    //1.创建函数,将数字转换为小星星
    val startList = (1 to 10).map( "*"* _)
    println(startList)
  }

}

3.柯里化

是指将原先接受多个参数的方法转换只有一个参数的多个列表的过程
def func(x,y,z) 柯里化=> def func(x)(y)(z) 调用时,由左至右依次调用
原理是首先调用x,传入x的函数的返回值,在被调用传入y,传入y的函数的返回值,在被调用传入z

package com.day04

object FuncDemo {
    //1.定义一个方法,计算两个int类型的值
    def calculate(a: Int, b: Int)(calc: (Int, Int) => Int) = {
      calc(a, b)
    }

    def main(args: Array[String]): Unit = {
      println(calculate(2, 3)(_ + _))
      println(calculate(1, 3)(_ * _))
      println(calculate(2, 3)(_ % _))
    }
}

4.闭包

闭包就是一个函数的返回值依赖于声明在函数外部的变量

package com.day04

object FuncDemo {
  def main(args: Array[String]): Unit = {
    //定义一个函数,访问函数作用域外部的变量,产生闭包
    val y =10
    val addFunc: Int => Int = (x:Int) => x+y
    println(addFunc(1))
  }
}

隐式转换

是指以implicit关键字声明的带有单个参数的方法,他说自动被调用的,自动将某种类型转换为另一种类型

1.使用步骤

  1. 在object中定义隐式转换方法(使用implicit)
  2. 在需要用到隐式转换的地方,引入隐式转换(使用import)
  3. 自动调用隐式转换后的方法

2.示例

package com.day04

import java.io.File

import scala.io.Source

object test {
  class  RichFile(val file:File){
    def read() = {
      //将文件内容读取为字符串
      Source.fromFile(file).mkString
    }
  }
  //创建隐式转换
  object ImpliciyDemo{
    //将File对象转换为RichFile对象
    implicit def fileToRichFile(file: File) = new RichFile(file)
  }

  def main(args: Array[String]): Unit = {
    val file = new File("./Data/1.txt")
    import ImpliciyDemo.fileToRichFile
    //调用隐式转换
    println(file.read())
  }
}


隐式参数

1.定义

  1. 在方法后面添加一个参数列表,参数使用implicit修饰
  2. 在object中定义implicit修饰的隐式值
  3. 调用方法,可以不传入implicit修饰的参数列,编译器会自动查找缺省值

2.示例

package com.day04


object test {
  //1.定义一个方法,这个方法有一个隐式参数
  def quote(what:String)(implicit delimeters:(String,String)): String = {
    delimeters._1 + what + delimeters._2
  }
  //2.定义一个隐式参数
  object ImplicitDemo{
    implicit val delimeterParam = ("<<",">>") //修饰一个变量
  }
  //3.调用方法执行测试
  def main(args: Array[String]): Unit = {
    import ImplicitDemo.delimeterParam
    println(quote("你好"))
  }
}

Akka并发编程框架

1.特性,编程流程

(1)特性

  • 提供基于异步非阻塞,高性能的事件驱动编程模型
  • 内置容错机制,润兴Actor出错是,进行重置
  • 使用Akka可以单机上构建高并发程序,也可以在网络中构建分布式程序

(2)Akka Actor的并发编程模型的基本流程

  1. 学生创建一个ActorSystem
  2. 通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRef
  3. ActorRef将消息发送给Message Dispatcher(消息分发器)
  4. Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中
  5. Message Dispatcher将MailBox放到一个线程中
  6. MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中

2.入门案例

(1)创建maven模块

  <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

(2)创建并加载Actor

创建两个Actor

  • SenderActor:用来发消息
  • ReceiveActor:用来接收,回复消息
    发现消息: 接收类对象 ! 发送内容

(2).1创建Actor

  1. 创建ActorSystem
  2. 创建自定义Actor
  3. ActorSystem加载Actor

(2).2创建SenderActor单例类发送消息

package com.yuge.akk.demo

import akka.actor.Actor

//实现akk的Actor
object SenderActor extends Actor{
  //在Actor并发模型中,需要实现的方法是act
  //想要持续接收消息使用的是loop+react
  //在akk编程模型中,直接在receive方法中写一个偏函数就可以持续接收消息了
  override def receive: Receive = {
    case "start" => {
      println("SenderActor:接收start消息")
      //发送SubmitTaskMessage消息给ReceiveActor
      //akka://actorSystem的名字/user/actor名字
      val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")
      receiverActor ! SubmitTaskMessage("提交任务")
    }
    case SuccessSubmitTaskMessage(message) => println("SenderActor:收到任务成功提交消息")
  }
}


(2).3创建ReceiveActor单例类接收消息

package com.yuge.akk.demo

import akka.actor.Actor

object ReceiveActor extends Actor{
  override def receive: Receive = {
    case SubmitTaskMessage(message) => {
      println("ReceiverActor接收到任务提交消息")
      //回复消息
      sender ! SuccessSubmitTaskMessage("接收成功,感谢")
    }
  }
}

(2).4实现程序入口Entrance

package com.yuge.akk.demo

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    //2.创建ActorSystem
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) //调用方法会自动加载resources里面的配置文件
    //3.加载Actor
    val senderActor = actorSystem.actorOf(Props(SenderActor),"senderActor")
    val receiverActor = actorSystem.actorOf(Props(ReceiveActor),"receiverActor")
    //在main方法中,发送一个"start"字符串给SenderActor
    senderActor ! "start"
  }

}


(2).5创建MessageDifinition类,用于样例类来封装消息

package com.yuge.akk.demo

//提交任务消息
case class SubmitTaskMessage(message:String)

//任务提交成功消息
case class SuccessSubmitTaskMessage(message:String)

3.Akka定时任务

(1)使用场景

使用Akka框架定时的处理一些任务

(2)使用方式

Akka中,提供一个schedular对象来实现定时调度功能.使用ActorSystem.scheduler.schedule方法,可以启动一个定时任务
schedule方法针对scala提供两种使用形式

1.发送消息

def schedule(
    initialDelay: FiniteDuration,		// 延迟多久后启动定时任务
    interval: FiniteDuration,			// 每隔多久执行一次
    receiver: ActorRef,					// 给哪个Actor发送消息
    message: Any)						// 要发送的消息类型
(implicit executor: ExecutionContext)	// 隐式参数:需要手动导入

2.自定义实现

def schedule(
    initialDelay: FiniteDuration,			// 延迟多久后启动定时任务
    interval: FiniteDuration				// 每隔多久执行一次
)(f: ⇒ Unit)								// 定期要执行的函数,可以将逻辑写在这里
(implicit executor: ExecutionContext)		// 隐式参数:需要手动导入

(3)示例-发送消息

package com.yuge.akk.scheduler

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  //1.创建一个Actor,接收打印消息
  object ReceiverActor extends Actor{
    override def receive: Receive = {
      case x => println(x)
    }
  }

  def main(args: Array[String]): Unit = {
    //2.构建ActorSystem,加载Actor
    val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())
    //加载Actor
    val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor),"receiverActor")

    //导入隐式转换
    import  scala.concurrent.duration._ //方便我们引用时间
    //导入隐式参数
    import actorSystem.dispatcher //执行上下文
    //3.定时发送消息给Actor
    //3.1延迟多久
    //3.2定时任务周期
    //3.3发送消息给那个Actor
    //4.4发送消息的内容
    // 隐式转换:0这样的整形就会多出来seconds方法指定时间,
    actorSystem.scheduler.schedule(0.seconds,1 seconds,receiverActor,"hello")
  }
}

(4)示例-自定义实现

package com.yuge.akk.scheduler

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  //1.创建一个Actor,接收打印消息
  object ReceiverActor extends Actor{
    override def receive: Receive = {
      case x => println(x)
    }
  }

  def main(args: Array[String]): Unit = {
    //2.构建ActorSystem,加载Actor
    val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())
    //加载Actor
    val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor),"receiverActor")

    //导入隐式转换
    import  scala.concurrent.duration._ //scala下的并发包中的duration,方便我们引用时间
    //导入隐式参数
    import actorSystem.dispatcher //执行上下文

    ///3.定时发送消息给Actor
    //3.1延迟多久
    // 隐式转换:0这样的整形就会多出来seconds方法指定时间,
    actorSystem.scheduler.schedule(0 seconds,1 seconds){
        //业务逻辑
      receiverActor ! "hello"
    }
  }
}

(5)注意事项

  1. 需要导入隐式转换import scala.concurrent.duration._才能调用0 seconds方法
  2. 需要导入隐式参数 import actorSystem.dispatcher 才能启动定时任务

4.Akka进程间通信

(1)WorkerActor进行网络配置

在resources创建名称为application.conf文件写入以下内容

akka.actor.provider = "akka.remote.RemoteActorRefProvider" //支持远程通信
akka.remote.netty.tcp.hostname="127.0.0.1" //IP地址
akka.remote.netty.tcp.port="9999" //端口号

(2)创建单例对象WorkActor用来接收消息

package com.yuge.akka

import akka.actor.Actor

object WorkerActor extends Actor{
  override def receive: Receive = {
    //编写业务逻辑
    case x =>println(x)
  }
}

(3)创建Entrance入口

在这里插入代码片

5.简易版spark通信框架案例

  • 一个Master管理Worker
  • 若干个Work
    • 注册
    • 发送心跳

(1)实现思路

本案例分为三个阶段来实现:

  1. Worker注册阶段
    • Worker进程向Master注册(将自己的ID、CPU核数、内存大小(M)发送给Master)
  2. Worker定时发送心跳阶段
    Worker定期向Master发送心跳消息
  3. Master定时心跳检测阶段
    Master定期检查Worker心跳,将一些超时的Worker移除,并对Worker按照内存进行倒序排序
  4. 多个Worker测试阶段
    抽取Worker参数,通过命令行参数接收Worker参数(绑定端口号、CPU、内存)

(2)工程搭建

1.项目分块

工程名 说明
spark-demo-common 存放公共消息,实体类
spark-demo-master Akka Master节点
spark-demo-worker Akka Work节点

2.maven依赖(借助上面的maven)

//增加一个这个引用一下common模块maven
<dependency>
            <groupId>com.ityuge</groupId>
            <artifactId>spark-demo-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

3.导入配置文件application.conf(在同一台机器上不能有端口冲突)

- 修改Master的端口为7000
- 修改Worker的端口为7100
akka.actor.provider = "akka.remote.RemoteActorRefProvider" //支持远程通信
akka.remote.netty.tcp.hostname="127.0.0.1" //IP地址
akka.remote.netty.tcp.port="7000" //端口号
akka.actor.provider = "akka.remote.RemoteActorRefProvider" //支持远程通信
akka.remote.netty.tcp.hostname="127.0.0.1" //IP地址
akka.remote.netty.tcp.port="7100" //端口号

(3)构建Master和Work

1.步骤

  1. 创建并加载Master Actor
  2. 创建并加载Worker Actor
  3. 测试是否启动成功

2.

3.


转载:https://blog.csdn.net/weixin_45154559/article/details/106220863
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场