hwaaaaaa 2023-06-05 15:11
浏览 37
已结题

test,百度,spark,测试不用回答


$.get("/a1").done(function (data) {
        myChart1.setOption({
            xAxis: {
                type: 'category',
                data: data.year
            },
            yAxis: {
                type: 'value'
            },
            series: [
                {
                    data: data.num,
                    type: 'line'
                }
            ]
        })

    })
```python
@app.route("/a1")
def a1():
    conn = pymysql.connect(
        host="localhost",
        user="root",
        password="baidu123",
        database="test"

    )
    cursor = conn.cursor()
    cursor.execute("select * from exam1")
    data = cursor.fetchall()
    exam_map = {"year": [],"num":[]}
    for row in data:
        exam_map.get("year").append(row[0])
        exam_map.get("num").append(row[1])
    return jsonify(exam_map)
```java
try {
                String[] line = value.toString().split(",");
                context.write(new Text(line[0]), new LongWritable(Long.parseLong(line[8])));
            } catch (Exception e) {
                return;
            }

long sum = 0, count = 0,max=0,min=400;
            for (LongWritable value : values) {
                sum += value.get();
                count++;
                max=Math.max(max,value.get());
                if(min>value.get()) {
                    min=Math.min(min,value.get());
                }
            }
            context.write(key, new LongWritable(max));
            context.write(key, new LongWritable(min));
```python
#spark
case class ka(time:String,word: String, count: Int)

  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "C:\\Users\\hw\\Desktop\\中级实战\\资料\\hadoop-2.9.2")
//    val conf: SparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[2]")
    val sess=SparkSession.builder().appName("spark_kafka").master("local[2]").getOrCreate()
    val sc =sess.sparkContext
    sc.setLogLevel("ERROR")
    //拉取时间
    val ssc = new StreamingContext(sc, Seconds(5))

    //kafka配置
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "master:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "mygroup",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    //主题
    val topics: Array[String] = Array("order")
    //kafka数据流
    val stream: InputDStream[ConsumerRecord[String, String]] = {
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    }
    import sess.implicits._

    val prop=new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    val url="jdbc:mysql://localhost/test"

    stream.foreachRDD(
      x => {
        println("Time:" + new Date())
        val value: RDD[(String, String,String)] = x.map(record => (record.key, record.value,record.value))
        val filter = value.filter(!_._2.equals(""))
        val words = filter.flatMap(_._2.split(" ")).map(x => (x, 1))
        val count = words.reduceByKey(_ + _)
        val sdf=new SimpleDateFormat("YY:mm:dd HH:mm:ss")
        count.foreach(println)
        count.map(x=>ka(sdf.format(new Date()),x._1,x._2)).toDF()
          .write.mode(SaveMode.Append).jdbc(url,"20205080910068朱汉威",prop)

      })

    println("----------开始计算-------------")
    ssc.start()
    println("----------任务正在执行-------------")
    ssc.awaitTermination()
  }

  • 写回答

1条回答 默认 最新

  • hwaaaaaa 2023-06-12 11:14
    关注
    #数据库可视化
    import DBHelpe
    import pandas as pd
    import  matplotlib.pyplot as plt
    import warnings
    #忽略警告信息
    warnings.filterwarnings('ignore')
    plt.rcParams['font.sans-serif']=['SimHei']
    conn=DBHelpe.MyDBHelpe()
    plt.figure()#创建一个画布
    print(conn)
    #读取sql语句,sql文件
    df=pd.read_sql('select *  from tb_lianjia',con=conn.conn)
    df=df.groupby(by='type')['price'].sum()
    print(df)
    plt.subplot(2,2,1)#分割成2*2的矩阵,矩阵的第一个画柱状图
    df.plot(kind='bar')
    #通过sql语句进行可视化
    
    df1=pd.read_sql("SELECT address,count(*)as 数量 FROM tb_lianjia GROUP BY address ORDER BY 数量 DESC LIMIT 10",con=conn.conn)
    lables=df1['address']
    x=df1['数量']
    plt.subplot(2,2,2)
    plt.pie(x,labels=lables,autopct='%2.f%%')
    #根据不同地区平均价格画条形图
    df2=pd.read_sql("SELECT address,avg(price) as 平均价格 from tb_lianjia GROUP BY address ORDER BY  平均价格 DESC LIMIT 20",con=conn.conn)
    plt.subplot(2,2,3)
    plt.barh(df2['address'],df2['平均价格'],color='r')
    plt.show()
    
    '''
    爬取链家二手房数据,保存到MySQL数据库中
    id, title,address,type,area,price,model
    '''
    import requests
    import DBHelpe
    from bs4 import  BeautifulSoup
    import time
    import lxml
    db=DBHelpe.MyDBHelpe()
    num=int(input("请输入你爬取的页数:"))
    for page in range(num):
        url="https://sh.lianjia.com/ershoufang/pg{}/".format(page+1)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36'}
        page_text=requests.get(url,headers).text
        time.sleep(3)
        print("第"+str(page+1)+"页开始爬取完成!!!!!!")
        #解析源码
        soup=BeautifulSoup(page_text,'lxml')
    
        li_list=soup.find("ul",{"class":"sellListContent"}).find_all("li",{"class":"LOGCLICKDATA"})
        #获取每个字段数据
        for li in  li_list:
            title=li.find("div",{"class":"title"}).a.text
            print(title)
            address=li.find("div",{"class":"positionInfo"}).a.text
            #先找到父级div,在查询子的a标准
            address1 = li.find("div", {"class": "positionInfo"}).find_next("a").find_next_sibling("a").text
            print(address,address1)
            #3室2厅 | 128.33平米 | 南 | 简装 | 高楼层(共22层) | 2011年建 | 板楼
            #.......
            #保存到数据库
            sql="insert into tb_house(title,address,type,area,price,model) values('%s','%s','%s','%s','%s','%s')"%(title,address+address1,)
            db.add(sql)
    
    
    #MyDBHelpe
    '''
    面向对象进行封装,数据库操作类
    '''
    import pymysql
    class MyDBHelpe:
        #初始化连接数据库
        def __init__(self):
            self.conn = pymysql.connect(host="localhost", port=3306, user="root", passwd="baidu123", database="domedb")
            #创建游标
            self.cur=self.conn.cursor()
        #销毁
        def __del__(self):
            self.cur.close()
            self.conn.close()
            print("数据库连接已关闭!!!!!!")
        #添加方法
        def add(self,sql):
            self.cur.execute(sql)
            # 把游标的操作提交到数据库
            self.conn.commit()  # 增加,删除,修改需要
            print("数据插入成功!!!!")
        def update(self,sql):
            self.cur.execute(sql)
            # 把游标的操作提交到数据库
            self.conn.commit()  # 增加,删除,修改需要
            print("数据更新成功!!!!")
        def delete(self,sql):
            self.cur.execute(sql)
            # 把游标的操作提交到数据库
            self.conn.commit()  # 增加,删除,修改需要
            print("数据删除成功!!!!")
    
        #查询所有记录
        def findAll(self,sql):
            self.cur.execute(sql)
            data=self.cur.fetchall()
            return  data
    
    
    评论

报告相同问题?

问题事件

  • 系统已结题 6月13日
  • 创建了问题 6月5日

悬赏问题

  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)
  • ¥20 matlab yalmip kkt 双层优化问题
  • ¥15 如何在3D高斯飞溅的渲染的场景中获得一个可控的旋转物体