webMagic 爬虫使用

现在的主流爬虫一般是使用python的scrapy来爬取数据,我主流技术栈是JAVA,py虐懂皮毛,上网搜索了下java的爬虫,发现有个国产的爬虫webMagic,网上使用率比较高。API简单易懂

查看官方文档,webMagic官网 不知道为什么 我电脑打不开,找到里面的docs,有提供简单的示例和各种的常见操作处理,一般作者的博客都是oschina上,可以去上面查看爬虫相关的博客

我这边先准备爬一下天气网中国天气官网的数据练手,webMagic封装了几个常用的组件
组件图

组件 用途
DownLoader 网页的处理:普通静态网页,selenium JS渲染抓取
PageProcesser 整个HTML会载入这个processer,可以通过xpath,正则,JS抓取
Scheduler 多个请求网页计划的容器,可以使用优先规则,先进先出,都实现了重复请求的排除策略,防止重复抓取
Pipeline 处理从processer过来的数据,可以自行处理保存到数据库或者其他地方
页面内容获取

抓取的网页属于动态网页,需要使用载体来渲染页面,一开始使用的是phantomjs,爬取的时候 总是会出现一些莫名的错误,后来我改用了chromeDriver的headless模式来抓取。config.ini文件的配置如下:

What WebDriver to use for the tests
driver=chrome

config.ini的读取设置

ClassPathResource classPathResource = new ClassPathResource("/config.ini");
String path = classPathResource.getURL().getPath();

System.setProperty("webdriver.chrome.driver", "E:\\workdocument\\ww\\chromedriver.exe");
System.setProperty("selenuim_config", path);

上去配置了驱动路径和config文件的路径,作者上面的有个key命名写错了,应该是selenium。在代码中会读取config.ini中的属性来采用哪种驱动类型来渲染页面

processer的处理

分析网页我采用的是xpath的表达式,感觉写复杂的网页内容定向的时候也比较简单,还支持一些高级的表达式功能。参考网址
xpath 先爬取了常用的7天的天气数据

for (int i = 1; i <= 7; i++) {
            // 1 day
            // 白天气象文字
            Air7Day air7Day = new Air7Day();
            String dayWord = page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/p[1]/text()").toString();
            String daytimeCode = page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/big[1]/@class").toString();
            String dayNightCode = page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/big[1]/@class").toString();

            int highTemperature = Integer.parseInt(page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/p[2]/span/text()").toString());
            int lowTemperature = Integer.parseInt(page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/p[2]/i/text()").toString().replace("℃", ""));

            String windDirection = page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/p[3]/em/span[1]/@title").toString();
            String windScale = page.getHtml().xpath("//div[@id='7d']/ul/li["+ i +"]/p[3]/i/text()").toString();

            if (dayWord.contains("转")) {
                String[] dayWordArray = dayWord.split("转");
                String daytime = dayWordArray[0];
                String dayNight = dayWordArray[1];

                air7Day.setTextForDay(daytime);
                air7Day.setTextForNight(dayNight);
            } else {
                air7Day.setTextForDay(dayWord);
                air7Day.setTextForNight(dayWord);
            }

            air7Day.setCodeForDay(daytimeCode.substring(daytimeCode.length() - 2, daytimeCode.length()));
            air7Day.setCodeForNight(dayNightCode.substring(dayNightCode.length() - 2, dayNightCode.length()));

            air7Day.setHighTemperature(highTemperature);
            air7Day.setLowTemperature(lowTemperature);

            air7Day.setWindDirection(windDirection);
            air7Day.setWindScale(windScale);

            air7Day.setCityId(101020100);
            air7Day.setWDate(DateUtil.shiftDate(i - 1));

            air7DayList.add(air7Day);

        }
pipeline处理

这边暂时就比较简单,直接存入了数据库,存入之前去了一下重复数据

if (!resultItems.isSkip()) {
            List<Air7Day> air7DayList = resultItems.get("air7DayList");
            air7DayList.forEach(air7Day -> {
                // 判断是否存在
                int c = (int) sqlSessionHandler.selectOne("airDailyMapper.air:daily:exists", air7Day);
                if (c > 0) {
                    // 存在更新
                    sqlSessionHandler.insert("airDailyMapper.air:daily:update", air7Day);
                } else {
                    // 不存在则插入
                    sqlSessionHandler.insert("airDailyMapper.air:daily:insert", air7Day);
                }
            });
        }

skip来定义这个resultItems是否跳过,可以通过这个判断来决定是否做后续的处理.

过程中遇到的问题
  1. webMagic-core包中的log4j的冲突,解决方式是排除

    compile('us.codecraft:webmagic-core:0.7.3'){
         exclude group:'org.slf4j',module:'slf4j-log4j12'
     }
    compile('us.codecraft:webmagic-selenium:0.7.3'){
         exclude group:'org.slf4j',module:'slf4j-log4j12'
         exclude group:'com.github.detro',module:'phantomjsdriver'
     }
    
  2. 如果使用的是phantomjs的话,关联jar包请使用
    compile(‘us.codecraft:webmagic-core:0.7.3’)
    compile(‘com.github.detro:ghostdriver:2.1.0’)

  3. chrome无头模式时,暂时没有找到设置chromeOption的地方
    直接在重写了一遍downloader,处理了一下无头模式,防止chrome弹出的问题

暂时还没用到太多复杂的爬虫操作,整体上都是可以很轻松的解决,满足正常的爬虫需求,不得不赞一下webMagic,文档也写的很细心,一个优秀的爬虫框架。

java nio

参照网址 http://ifeve.com/overview/

学习netty之前需要先弄懂NIO各个组件的基本功能

NIO 主要是由Channel,Selector,Buffer组成

Channel的实现如下:

  • FileChannel – 从文件中读取数据
  • DatagramChannel – UDP读写网络中读取数据
  • SocketChannel – TCP读写网络中读取数据
  • ServerSocketChannel – 监听新的TCP连接 每一个新进来的连接都会创建对应的SocketChannel

FileChannel的简单示例如下,去掉之前的inputstream,outputstream读写,改成了nio的byteBuffer读写

    public void readFile() throws IOException, URISyntaxException {

        URL url = this.getClass().getResource("/data.txt");
        RandomAccessFile file = new RandomAccessFile(new File(url.toURI()), "rw");
        FileChannel channel = file.getChannel();

        ByteBuffer buffer = ByteBuffer.allocate(20);
        int read = channel.read(buffer);

        while (read != -1) {

            System.out.println("read: " + read);
            buffer.flip();

            while (buffer.hasRemaining()) {
                System.out.println(buffer.get());
            }

            buffer.clear();
            read = channel.read(buffer);

        }

        channel.close();
        file.close();

    }

可以使用nio的Files来读取每一行数据,是配合Path对象来处理每行数据的,使用lambda表达式打印每一行数据

    public void readFile() throws IOException, URISyntaxException {

        URL url = this.getClass().getResource("/data.txt");

        Files.lines(Paths.get(url.toURI())).forEach(line -> {
            System.out.println(line);
        });

    }

对比上来说 整洁了不少

Buffer的类型如下:没有BooleanBuffer,相应的多一个内存映射MappedByteBuffer

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • FloatBuffer
  • LongBuffer
  • ShortBuffer

flip方法将Buffer从写模式切换到读模式
cap,pos,lim

Buffer分配

  • ByteBuffer.allocate(215)

Buffer写入数据

  • channel.read(buffer)
  • buffer.put(215)

Buffer读出数据

  • channel.write(buffer)
  • buffer.get()
  • buffer.getInt()

下面这些组件都需要慢慢琢磨学习

Selector
创建一个Selector使用如下方法
Selector selector = Selector.open();

Selector一般是配合channel使用,把channel注册到selector上,如果配合使用的时候必须使用非阻塞模式,FileChannel不能注册到Selector上。
注册监测的事件包含以下几种

  • connect – 连接就绪
  • accept – 接收就绪
  • read – 读就绪
  • write – 写就绪

如果需要监听多个事件 使用SelectionKey.OP_READ | SelectionKey.OP_WRITE 或的方式可以同时监听多个事件

SocketChannel,ServerSocketChannel

类似FileChannel可以配合Selector更好的监控渠道的状态

DatagramChannel

是一个发送UDP包的通道,无连接的网络协议,发送和接收的都是数据包

Pipe

java nio 可以通过两个线程之间单向数据连接,数据写入到sink,从source读取

nio pipe

sink通道的获取
Pipe.SinkChannel sinkChannel = pipe.sink();

source通道的获取
Pipe.SourceChannel sourceChannel = pipe.source();

前面都是一些零散的基础概念
主要是为了后面学习netty做准备

netty中会持有两个EventLoopGroup,其中一个boss组是来接收客户端发来的TCP链接请求的,worker组是用来完成三次我搜的链接套接字的网络IO请求。
三次握手的TCP时序图
tcp status

又从正网弄了一个TCP状态跃迁图
tcp status

服务器上tcp连接都会处于其中的某些状态
tcp server status

time_wait数相对过多,发起socket主动关闭的一方 socket将进入TIME_WAIT状态,主要是由于socket keeplive有时间限制,到一定时间时 会主动关闭socket连接,但是客户端还没有响应关闭。

每一个socket都会打开一个socketChannel,channel中的数据会通过channelPipeline来执行多个handler来处理数据。
tcp server status
分为输入和输出两类handler

JDK预置了interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以Netty提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用

理论看太多容易迷失怎么样具体去操作
现在比较流行lettuce框架,底层跟redis通信采用的是netty来做的

RedisClient是redis客户端连接类,看其中一个方法

connectStatefulAsync

    private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
            DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {

        ConnectionBuilder connectionBuilder;
        // ssl 的关联操作
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }

        connectionBuilder.connection(connection);
        connectionBuilder.clientOptions(clientOptions);
        connectionBuilder.clientResources(clientResources);
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);

        // 这个protect方法就是内部包装的传统的netty启动方法
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        channelType(connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            if (hasPassword(redisURI)) {
                connectionBuilder.enableAuthPingBeforeConnect();
            } else {
                connectionBuilder.enablePingBeforeConnect();
            }
        }

        ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

        if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().auth(new String(redisURI.getPassword()));

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            future.thenApply(channelHandler -> {
                connection.setClientName(redisURI.getClientName());
                return channelHandler;
            });

        }

        if (redisURI.getDatabase() != 0) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().select(redisURI.getDatabase());

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        return future.thenApply(channelHandler -> (S) connection);
    }

connectionBuilder 方法

    protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
            RedisURI redisURI) {

        Bootstrap redisBootstrap = new Bootstrap();
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
        redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);

        SocketOptions socketOptions = getOptions().getSocketOptions();

        redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
                Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));

        if (LettuceStrings.isEmpty(redisURI.getSocket())) {
            redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
            redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
        }

        connectionBuilder.timeout(redisURI.getTimeout());
        connectionBuilder.password(redisURI.getPassword());

        connectionBuilder.bootstrap(redisBootstrap);
        connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
        connectionBuilder.socketAddressSupplier(socketAddressSupplier);
    }

DefaultEventLoopGroupProvider中有创建NioEventLoopGroup的工作,默认大小是取系统变量io.netty.eventLoopThreads的值和cpu的个数,我的是4核,所有默认持有4个单线程的Executor执行器,task在每个里面都是由单个执行器来队列完成的。
查看服务器在跑的线程名称后发现实际只有一个
“lettuce-nioEventLoop-6-1” #965 daemon prio=5 os_prio=0 tid=0x00007fec152bc800 nid=0x75d4 runnable [0x00007fecc935f000]
应该是负载还没到,后期压力测试下看看

spring webflux

https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html

java redis lua

现在java项目,经常会需要与redis进行交互,有时一次普通的网络请求会调用10几次,虽然是在内网,长连接,但是还是会发生10几次的网络io,占用多的系统资源,当并发上来的时候,会对整体的网络请求发生一定延迟影响。

其中一种解决方案就是将10次请求的处理做成redis script,处理成一次来处理。但是阿里云的redis集群形式,会对key的分布进行限制,如果不在一个redis node中,就会报错。建议使用hash来存储多个数据,集中在一个node中。通过脚本的方式来执行。redis会保证每个脚本执行时的原子性。

redis脚本采用的lua语言来编写,这个语音轻量处理一般的业务逻辑处理来说已经足够。脚本和java来交互参数传递支持两类参数,一类是KEYS,代表的是redis key list;一类是ARGV 代表的是参数列表,需要传递的特殊参数等。常用的方法有tonumber使字符串转成lua。还支持一些常用的lua lib库。

  • base lib.
  • table lib.
  • string lib.
  • math lib.
  • debug lib.
  • struct lib.
  • cjson lib.
  • cmsgpack lib.
  • bitop lib.
  • redis.sha1hex function.

如果传递的java 包装对象,无法以字符串的方式传递时,可以把对象序列化成字符串,再在lua脚本里面通过cjson反序列化成对象,在迭代操作key来做业务处理。同样也会保持原子性的操作。在一定的并发上性能有一定的提升,主要是减少了io的操作次数开销。

    -- dongbin.yu balance filter lua

local configLimit = cjson.decode(ARGV[1])

local flag = false

for i,v in ipairs(configLimit) do
    -- request
    if(v['limitIndicator'] == 1) then
        if(v['limitCycle'] == 1) then
            local r = tonumber(redis.call('HGET', KEYS[1], 'r'))
            if(type(r) ~= 'nil' and r >= v['limitValue']) then
                flag = true
                break
            end
        elseif (v['limitCycle'] == 3) then
            local hr = tonumber(redis.call('HGET', KEYS[1], 'hr'))
            if(type(hr) ~= 'nil' and hr >= v['limitValue']) then
                flag = true
                break
            end
        elseif (v['limitCycle'] == 4) then
            local mr = tonumber(redis.call('HGET', KEYS[1], 'mr'))
            if(type(mr) ~= 'nil' and mr >= v['limitValue']) then
                flag = true
                break
            end
        end
    elseif(v['limitIndicator'] == 2) then
        if(v['limitCycle'] == 1) then
            local i = tonumber(redis.call('HGET', KEYS[1], 'i'))
            if(type(i) ~= 'nil' and i >= v['limitValue']) then
                flag = true
                break
            end
        elseif (v['limitCycle'] == 3) then
            local hi = tonumber(redis.call('HGET', KEYS[1], 'hi'))
            if(type(hi) ~= 'nil' and hi >= v['limitValue']) then
                flag = true
                break
            end
        elseif (v['limitCycle'] == 4) then
            local mi = tonumber(redis.call('HGET', KEYS[1], 'mi'))
            if(type(mi) ~= 'nil' and mi >= v['limitValue']) then
                flag = true
                break
            end
        end
    elseif(v['limitIndicator'] == 3) then
        if(v['limitCycle'] == 1) then
            local c = tonumber(redis.call('HGET', KEYS[1], 'c'))
            if(type(c) ~= 'nil' and c >= v['limitValue']) then
                flag = true
                break
            end
        elseif (v['limitCycle'] == 3) then

            local hc = tonumber(redis.call('HGET', KEYS[1], 'hc'))
            if(type(hc) ~= 'nil' and hc >= v['limitValue']) then
                flag = true
                break
            end
        elseif (v['limitCycle'] == 4) then
            local mc = tonumber(redis.call('HGET', KEYS[1], 'mc'))
            if(type(mc) ~= 'nil' and mc >= v['limitValue']) then
                flag = true
                break
            end
        end
    end
end

return flag

迭代使用的是for i,v in ipairs,lua代码感觉写起来比java要简洁不少。如果使用openresty的话也是需要掌握lua,游戏领域也有不少游戏是采用lua编写的。