从流中解出数据

从流中解出树

现使用SAX解析xml生成了一系列的event(流)如何从这些event中解出dom(数据)?

xml如下

<root>
<pluginA><Policy>testA</Policy></pluginA>
<pluginB>testB</pluginB>
</root>

生成的event如下

const events = [
    new Start('root'),
    new Start('pluginA'),
    new Start('Policy'),
    new Text('testA'),
    new End('pluginA'),
    new Start('pluginB'),
    new Text('testB'),
    new End('pluginB'),
    new End('root'),
]

问题在于树状结构是嵌套的 将其拍平之后才能流式的发送 为了从流中解出树 我们必须维护这种嵌套关系

这应当是个栈的结构 每次遇到Start创建一个对象 把此对象引用放到栈顶对象的child中 此对象入栈 End出栈 栈中的最后一个元素即为想要的结构

本质上将是维护了一个引用的队列

const expect = require('chai').expect;
class Start {
    constructor(tag, attribute) {
        this.tag = tag;
        this.attribute = attribute || {};
    }
}
class End {
    constructor(tag) {
        this.tag = tag;
    }
}
class Text {
    constructor(text) {
        this.text = text;
    }
}

class Element {
    constructor(tagName, attribute, child) {
        this.tagName = tagName;
        this.attribute = attribute || {};
        this.child = child || [];
    }
}

class Handle {
    constructor() {
        this.stack = [];
    }

    create(tag, attribute) {
        const ele = new Element(tag, attribute);
        //first element
        if (this.stack.length == 0) {
            this.stack.push(ele);
        } else {
            this.stack[this.stack.length - 1].child.push(ele);
        }
        this.stack.push(ele);
    }

    text(text) {
        this.stack[this.stack.length - 1].child.push(text);
    }

    end(tag) {
        this.stack.pop();
    }
    build() {
        return this.stack[0];
    }
}

function trans(events) {
    const ele = new Handle();
    for (let event of events) {
        if (event instanceof Start) {
            ele.create(event.tag, event.attribute);
        } else if (event instanceof Text) {
            ele.text(event.text);
        }
        else if (event instanceof End) {
            ele.end(event.tag);
        }
    }
    return ele.build();
}

describe('Trans', () => {
    const MockEvents = [
        new Start('root'),
        new Start('pluginA'),
        new Start('Policy'),
        new Text('testA'),
        new End('Policy'),
        new End('pluginA'),
        new Start('pluginB'),
        new Text('testB'),
        new End('pluginB'),
        new End('root'),
    ];
    it('should ok', () => {
        let expectRes = new Element('root', {},
            [new Element('pluginA', {},
                [new Element('Policy', {}, ['testA'])]),
            new Element('pluginB', {}, ['testB']),
            ],
        )
        let res = trans(MockEvents);
        expect(res).deep.eq(expectRes);
    })
});

从流中聚合数据

差速器

假设有一个流 元素为某种含有时间值的对象 现在我们希望指定一个时间访问将其聚合

如同差速器一样 我们只要知道

  1. 当前的元素是否足够和之前元素组合起来一起被聚合
  2. 给定固定长度的元素如何将其聚合起来

    //差速器
    loop {        
    let item = {
        if let Some(i) = self.data_source.next() {
            i
        } else {
            //没有后续 直接聚合
            return self.decoder.decode_eof();
        }
    };
    //decoder 可能聚合出数据 此时直接返回 
    //        也可能还需要更多的数据 即再走一遍循环
    if let Some(framed_item) = self.decoder.decode(item) {
        return Some(framed_item);
    }
    }

    RX-RUST

    reactive like impl

    use std::marker::{PhantomData, Sized};
    use std::time::{Duration, Instant};

pub trait DataSource: Iterator where Self: Sized, { fn buffertime( self, time: Duration, ) -> BufferWhenObservable1<Self, Self::Item, TimedDecoder<Self::Item>> where T: Clone, Self: Iterator<Item = T>, Self: Sized, { self.bufferwhen(TimedDecoder::new(time)) }

fn buffer_when<P>(self, picker: P) -> BufferWhenObservable1<Self, Self::Item, P>
where
    Self: Sized,
    P: StreamPicker<Item = Self::Item>,
{
    return BufferWhenObservable1::new(self, picker);
}

}

pub enum BufferWhenState { ContinueAndTakeThis, ContinueAndDropThis, EndAndTakeThis, EndAndLeftThis, }

trait StreamPicker { type Item; fn pick(&mut self, item: &Option<Self::Item>) -> BufferWhenState; fn clean(&mut self); }

[derive(Clone)]

pub struct TimedDecoder { start: Option, timerange: Duration, _typeplace_hoder: PhantomData, }

impl TimedDecoder { fn new(timerange: Duration) -> Self { Self { start: None, timerange, typeplace_hoder: Default::default(), } } }

impl StreamPicker for TimedDecoder { type Item = T; fn clean(&mut self) { self.start = None; }

fn pick(&mut self, item: &Option<T>) -> BufferWhenState {
    if self.start.is_none() {
        self.start = Some(Instant::now());
    }

    if item.is_none() {
        return BufferWhenState::EndAndTakeThis;
    }

    if let Some(start) = self.start {
        if start.elapsed() < self.time_range {
            return BufferWhenState::ContinueAndTakeThis;
        }
        return BufferWhenState::EndAndLeftThis;
    }

    unreachable!()
}

}

pub struct BufferWhenObservable1<D, T, P> { data_source: D, cache: Vec, picker: P, }

impl<D, T, P> BufferWhenObservable1<D, T, P> { fn new(datasouce: D, picker: P) -> Self where P: StreamPicker<Item = T>, { Self { datasource: data_souce, cache: vec![], picker, } } }

impl<D, T, P> Iterator for BufferWhenObservable1<D, T, P> where D: DataSource<Item = T>, P: StreamPicker<Item = T>, { type Item = Vec; fn next(&mut self) -> Option<Self::Item> { let mut picker = &mut self.picker; picker.clean(); //TODO some werid loop { let next = self.data_source.next(); match picker.pick(&next) { BufferWhenState::ContinueAndTakeThis => { if let Some(next) = next { self.cache.push(next); } } BufferWhenState::ContinueAndDropThis => {}

            BufferWhenState::EndAndTakeThis => {
                if let Some(next) = next {
                    self.cache.push(next);
                }
                if self.cache.is_empty() {
                    return None;
                }
                let ret = self.cache.drain(0..).collect();
                return Some(ret);
            }

            BufferWhenState::EndAndLeftThis => {
                let ret = self.cache.drain(0..).collect();
                if let Some(next) = next {
                    self.cache.push(next);
                }
                if self.cache.is_empty() {
                    return None;
                }
                return Some(ret);
            }
        }
    }
    unreachable!()
}

}

[cfg(test)]

mod tests { use super::*; use std::collections::VecDeque; use std::thread;

#[derive(Debug, Clone, Eq, PartialEq)]
struct MockData(u64, String);

#[derive(Debug, Eq, PartialEq)]
struct TimedVec(VecDeque<MockData>);

impl DataSource for TimedVec {}

impl Iterator for TimedVec {
    type Item = MockData;
    fn next(&mut self) -> Option<Self::Item> {
        if let Some(ret) = self.0.pop_front() {
            thread::sleep(Duration::from_millis(ret.0));
            return Some(ret);
        }
        return None;
    }
}

#[test]
fn test_rx_buffer_time() {
    let list = TimedVec(VecDeque::from(vec![
        MockData(10, "1".to_string()),
        MockData(10, "2".to_string()),
        MockData(10, "3".to_string()),
    ]));
    let out: Vec<Vec<MockData>> = list.buffer_time(Duration::from_millis(20)).collect();
    let expect_out = vec![
        vec![MockData(10, "1".to_owned()), MockData(10, "2".to_owned())],
        vec![MockData(10, "3".to_owned())],
    ];
    assert_eq!(out, expect_out);
}

#[test]
fn test_rx_buffer_when() {
    let list = TimedVec(VecDeque::from(vec![
        MockData(10, "1".to_string()),
        MockData(10, "2".to_string()),
        MockData(10, "3".to_string()),
        MockData(10, "4".to_string()),
    ]));

    // let frame = TimedDecoder::
    let out = list.buffer_when(TimedDecoder::new(Duration::from_millis(20)));
    let out: Vec<Vec<MockData>> = out.collect();
    let expect_out = vec![
        vec![MockData(10, "1".to_owned()), MockData(10, "2".to_owned())],
        vec![MockData(10, "3".to_owned()), MockData(10, "4".to_owned())],
    ];
    assert_eq!(out, expect_out);
}

}