当前位置: 首页 > news >正文

一个动态网站多少钱网络seo关键词优化技巧

一个动态网站多少钱,网络seo关键词优化技巧,东莞网站建设是什么意思,做任务可以给钱的网站1、Accumulator累加器 Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中使用累加器,但是只能在任务执行结束后才能获得累加器的最终结果。Counter是一…

1、Accumulator累加器 


Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中使用累加器,但是只能在任务执行结束后才能获得累加器的最终结果。Counter是一个具体的累加器(Accumulator)实现,常用的Counter有IntCounter,LongCounter和DoubleCounter。

用法:

    1:创建累加器private IntCounter numLines = new IntCounter();2:注册累加器getRuntimeContext().addAccumulator("num-lines",this.numLines);3:使用累加器this.numLines.add(1);4:获取累加器的结果myJobExcutionResult.getAccumulatorResult("num-lines")

 案列:统计map算子处理数据的条数

package Flink_API;import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.configuration.Configuration;/*** 统计一下map函数处理了多少条数据*/
public class BatchCounterTest {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource=env.fromElements("1","2","3","4","5");DataSet<String> map = dataSource.map(new RichMapFunction<String, String>() {//            1:创建累加器private IntCounter numLines = new IntCounter();@Overridepublic void open(Configuration parameters) throws Exception {//注册累加器getRuntimeContext().addAccumulator("num-lines", numLines);}@Overridepublic String map(String s) throws Exception {//使用累加器numLines.add(1);return s;}}).setParallelism(5);map.print();env.execute("BatchCounterTest");}
}

2、广播变量:是通过广播将广播变量分发到taskmanager中进行处理

广播变量的使用步骤:1、初始化数据DataSet<Integer> toBroadcast = env.fromElements(1,2,3);2、广播数据(即注册数据,那个算子用,就在那个算子后面进行注册)算子.withBroadcastSet(toBroadcast,"broadcastSetName");3、获取数据Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");


实例程序:Flink从数据园中静静可以获取到用户的性命,最终需要将用户的性命和年龄信息打印出来。

package Flink_API;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;public class BatchBroadcastTest {public static void main(String[] args){//获取Flink的运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//准备需要的广播数据ArrayList<Tuple2<String,Integer>> broadData=new ArrayList<>();broadData.add(new Tuple2<>("wtt",29));broadData.add(new Tuple2<>("lidong",30));broadData.add(new Tuple2<>("hengda",40));DataSource<Tuple2<String,Integer>> tupleData=env.fromCollection(broadData);//处理需要广播的数据,将数据集转换成Map类型,Map中的key就是用户的性命,value就是用户年龄。DataSet<HashMap<String,Integer>> toBroadCast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String,Integer>>() {@Overridepublic HashMap<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {HashMap<String,Integer> hashMap=new HashMap<>();hashMap.put(stringIntegerTuple2.f0,stringIntegerTuple2.f1);return hashMap;}}).setParallelism(3);//到此,广播的数据已经准备好了//注意:在这里使用RichMapFunction获取广播变量//数据源单纯的姓名信息DataSource<String> nameDataSource = env.fromElements("wtt","lidong","hengda");DataSet<String> data=nameDataSource.map(new RichMapFunction<String, String>() {List<HashMap<String,Integer>> broadCastMap=new ArrayList<HashMap<String,Integer>>();HashMap<String,Integer> allMap=new HashMap<String,Integer>();/*** 1、类似MR当中的setup方法,只会执行一次* 2、可以在这里进行一些初始化操作* 3、可以在open方法当中获取广播变量*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//获取广播数据broadCastMap = getRuntimeContext().getBroadcastVariable("toBroadCastMapName");for(HashMap map:broadCastMap){allMap.putAll(map);//最终保存的格式就是{"name":"age"}}}/****每次条用map方法从allMap中获取数据即可*/@Overridepublic String map(String s) throws Exception {return s;}});}
}

3、广播流:批处理当中就是广播变量,流处理当中就是广播流

package Flink_API;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;//广播流
public class FlinkBroadcastStream {public static void main(String[] args) throws Exception {//创建运行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//Flink是以数据自带的时间戳字段为准env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度env.setParallelism(1);//1、获取第一个流,获取用户的浏览信息DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);//获取用户的黑名单流信息//2、获取用户的点击信息DataStream<BlackUser> blackUserDataStream = getUserBlackUserDataStream(env);//1定义一个MapStateDescriptor来描述我们要广播的数据的格式MapStateDescriptor<String,BlackUser> descriptor=new MapStateDescriptor<String, BlackUser>("userBlackList",String.class,BlackUser.class);//2将其中的配置数据源注册成广播流BroadcastStream<BlackUser> broadcastStream = blackUserDataStream.broadcast(descriptor);//3通过connect连接主流和广播流DataStream<UserBrowseLog> filterDataStream = browseStream.connect(broadcastStream).process(new BroadcastProcessFunction<UserBrowseLog, BlackUser, UserBrowseLog>(){@Overridepublic void processElement(UserBrowseLog value, ReadOnlyContext readOnlyContext, Collector<UserBrowseLog> collector) throws Exception {//从广播中获取对应的key的valueReadOnlyBroadcastState<String,BlackUser> broadcastState=readOnlyContext.getBroadcastState(descriptor);BlackUser blackUser=broadcastState.get(value.userID);if(blackUser !=null){System.out.print("用户"+value.userID + "在黑名单中,过滤掉该用户的浏览信息");}else{collector.collect(value);}}@Overridepublic void processBroadcastElement(BlackUser value, Context context, Collector<UserBrowseLog> collector) throws Exception {//实时更新广播流当中的数据BroadcastState<String,BlackUser> broadcastState=context.getBroadcastState(descriptor);broadcastState.put(value.userID,value);System.out.print("------------------>广播流当前的数据是:---------------->");System.out.print(broadcastState);}});filterDataStream.print();env.execute("FlinkBroadcastStream");}private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9001");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData;}private static DataStream<BlackUser> getUserBlackUserDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9002");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<BlackUser> processData=dataStreamSource.process(new ProcessFunction<String, BlackUser>() {@Overridepublic void processElement(String s, Context context, Collector<BlackUser> collector) throws Exception {try{BlackUser blackUser = com.alibaba.fastjson.JSON.parseObject(s, BlackUser.class);if(blackUser !=null){collector.collect(blackUser);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});return processData;}//定义用户黑名单的配置信息public static class BlackUser implements Serializable{private String userID;private String userName;public BlackUser(){}public BlackUser(String userID, String userName) {this.userID = userID;this.userName = userName;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}}//浏览类public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}
}

4、Flink分布式缓存Distributed Cache

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它

注册: 

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");

使用: 

 File myFile = getRuntimeContext().getDistributedCache().getFile("a.text");

 a.text文件


hello flink hello FLINK

完整代码:

public class DisCacheTest {public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt");DataSource<String> data = env.fromElements("a", "b", "c", "d");DataSet<String> result = data.map(new RichMapFunction<String, String>() {private ArrayList<String> dataList = new ArrayList<String>();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//2:使用文件File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");List<String> lines = FileUtils.readLines(myFile);for (String line : lines) {this.dataList.add(line);System.err.println("分布式缓存为:" + line);}}@Overridepublic String map(String value) throws Exception {//在这里就可以使用dataListSystem.err.println("使用datalist:" + dataList + "------------" +value);//业务逻辑return dataList +":" +  value;}});result.printToErr();}
}//

 

http://www.khdw.cn/news/10975.html

相关文章:

  • wordpress++pdf昆山seo网站优化软件
  • 快手里做网站荣耀封面的视频地推平台
  • 网站 维护 协议海南百度推广开户
  • 做网站开发的想接私活百度自动点击器怎么用
  • 广州做网站 汉狮网络如何优化网络
  • 网站建设开发原代码归属seo免费视频教程
  • 购买网站做友情链接nba赛季排名
  • 手机网站建设专家站长工具域名查询ip
  • 国内十大知名广告公司seo关键词优化指南
  • 济南市莱芜区网站合肥网络推广培训学校
  • 郑州睿网站建设免费百度下载
  • 网站开发岗位简介新媒体培训
  • 寒亭网站建设企业网络营销策划方案
  • 织梦做商城类网站教程镇江网站建站
  • 网址建站网络营销与直播电商专业
  • 著名品牌展厅设计济南网络seo公司
  • 博罗中山网站建设设计公司网站模板
  • 如何用python制作网页短视频seo排名系统
  • 网站多少图片怎么做超链接优化模型有哪些
  • 有了网站怎么做app山东关键词网络推广
  • 帮做装修设计的网站西安seo顾问
  • 音乐网站制作教程b站推广网站入口mmm
  • 口碑营销的例子宁波优化seo软件公司
  • 东莞定制网站建设潍坊关键词优化软件
  • 青岛专业网站建设价格竞价恶意点击犯法吗
  • 做环保工程常用的网站成都排名推广
  • 潍坊做企业手机版网站注册百度账号
  • wordpress模板设置福州seo技巧培训
  • 南充市租房子信息网韩国seocaso
  • 网站抠图怎么做的优化绿松石什么意思