从流中解出数据
从流中解出树
现使用SAX解析xml生成了一系列的event(流)如何从这些event中解出dom(数据)?
xml如下
<root>
<pluginA><Policy>testA</Policy></pluginA>
<pluginB>testB</pluginB>
</root>
生成的event如下
const events = [
new Start('root'),
new Start('pluginA'),
new Start('Policy'),
new Text('testA'),
new End('pluginA'),
new Start('pluginB'),
new Text('testB'),
new End('pluginB'),
new End('root'),
]
问题在于树状结构是嵌套的 将其拍平之后才能流式的发送 为了从流中解出树 我们必须维护这种嵌套关系
这应当是个栈的结构 每次遇到Start创建一个对象 把此对象引用放到栈顶对象的child中 此对象入栈 End出栈 栈中的最后一个元素即为想要的结构
本质上将是维护了一个引用的队列
const expect = require('chai').expect;
class Start {
constructor(tag, attribute) {
this.tag = tag;
this.attribute = attribute || {};
}
}
class End {
constructor(tag) {
this.tag = tag;
}
}
class Text {
constructor(text) {
this.text = text;
}
}
class Element {
constructor(tagName, attribute, child) {
this.tagName = tagName;
this.attribute = attribute || {};
this.child = child || [];
}
}
class Handle {
constructor() {
this.stack = [];
}
create(tag, attribute) {
const ele = new Element(tag, attribute);
//first element
if (this.stack.length == 0) {
this.stack.push(ele);
} else {
this.stack[this.stack.length - 1].child.push(ele);
}
this.stack.push(ele);
}
text(text) {
this.stack[this.stack.length - 1].child.push(text);
}
end(tag) {
this.stack.pop();
}
build() {
return this.stack[0];
}
}
function trans(events) {
const ele = new Handle();
for (let event of events) {
if (event instanceof Start) {
ele.create(event.tag, event.attribute);
} else if (event instanceof Text) {
ele.text(event.text);
}
else if (event instanceof End) {
ele.end(event.tag);
}
}
return ele.build();
}
describe('Trans', () => {
const MockEvents = [
new Start('root'),
new Start('pluginA'),
new Start('Policy'),
new Text('testA'),
new End('Policy'),
new End('pluginA'),
new Start('pluginB'),
new Text('testB'),
new End('pluginB'),
new End('root'),
];
it('should ok', () => {
let expectRes = new Element('root', {},
[new Element('pluginA', {},
[new Element('Policy', {}, ['testA'])]),
new Element('pluginB', {}, ['testB']),
],
)
let res = trans(MockEvents);
expect(res).deep.eq(expectRes);
})
});
从流中聚合数据
差速器
假设有一个流 元素为某种含有时间值的对象 现在我们希望指定一个时间访问将其聚合
如同差速器一样 我们只要知道
- 当前的元素是否足够和之前元素组合起来一起被聚合
-
给定固定长度的元素如何将其聚合起来
//差速器 loop { let item = { if let Some(i) = self.data_source.next() { i } else { //没有后续 直接聚合 return self.decoder.decode_eof(); } }; //decoder 可能聚合出数据 此时直接返回 // 也可能还需要更多的数据 即再走一遍循环 if let Some(framed_item) = self.decoder.decode(item) { return Some(framed_item); } }
RX-RUST
reactive like impl
use std::marker::{PhantomData, Sized}; use std::time::{Duration, Instant};
pub trait DataSource: Iterator
where
Self: Sized,
{
fn buffertime
fn buffer_when<P>(self, picker: P) -> BufferWhenObservable1<Self, Self::Item, P>
where
Self: Sized,
P: StreamPicker<Item = Self::Item>,
{
return BufferWhenObservable1::new(self, picker);
}
}
pub enum BufferWhenState { ContinueAndTakeThis, ContinueAndDropThis, EndAndTakeThis, EndAndLeftThis, }
trait StreamPicker { type Item; fn pick(&mut self, item: &Option<Self::Item>) -> BufferWhenState; fn clean(&mut self); }
[derive(Clone)]
pub struct TimedDecoder
impl
impl
fn pick(&mut self, item: &Option<T>) -> BufferWhenState {
if self.start.is_none() {
self.start = Some(Instant::now());
}
if item.is_none() {
return BufferWhenState::EndAndTakeThis;
}
if let Some(start) = self.start {
if start.elapsed() < self.time_range {
return BufferWhenState::ContinueAndTakeThis;
}
return BufferWhenState::EndAndLeftThis;
}
unreachable!()
}
}
pub struct BufferWhenObservable1<D, T, P> {
data_source: D,
cache: Vec
impl<D, T, P> BufferWhenObservable1<D, T, P> { fn new(datasouce: D, picker: P) -> Self where P: StreamPicker<Item = T>, { Self { datasource: data_souce, cache: vec![], picker, } } }
impl<D, T, P> Iterator for BufferWhenObservable1<D, T, P>
where
D: DataSource<Item = T>,
P: StreamPicker<Item = T>,
{
type Item = Vec
BufferWhenState::EndAndTakeThis => {
if let Some(next) = next {
self.cache.push(next);
}
if self.cache.is_empty() {
return None;
}
let ret = self.cache.drain(0..).collect();
return Some(ret);
}
BufferWhenState::EndAndLeftThis => {
let ret = self.cache.drain(0..).collect();
if let Some(next) = next {
self.cache.push(next);
}
if self.cache.is_empty() {
return None;
}
return Some(ret);
}
}
}
unreachable!()
}
}
[cfg(test)]
mod tests { use super::*; use std::collections::VecDeque; use std::thread;
#[derive(Debug, Clone, Eq, PartialEq)]
struct MockData(u64, String);
#[derive(Debug, Eq, PartialEq)]
struct TimedVec(VecDeque<MockData>);
impl DataSource for TimedVec {}
impl Iterator for TimedVec {
type Item = MockData;
fn next(&mut self) -> Option<Self::Item> {
if let Some(ret) = self.0.pop_front() {
thread::sleep(Duration::from_millis(ret.0));
return Some(ret);
}
return None;
}
}
#[test]
fn test_rx_buffer_time() {
let list = TimedVec(VecDeque::from(vec![
MockData(10, "1".to_string()),
MockData(10, "2".to_string()),
MockData(10, "3".to_string()),
]));
let out: Vec<Vec<MockData>> = list.buffer_time(Duration::from_millis(20)).collect();
let expect_out = vec![
vec![MockData(10, "1".to_owned()), MockData(10, "2".to_owned())],
vec![MockData(10, "3".to_owned())],
];
assert_eq!(out, expect_out);
}
#[test]
fn test_rx_buffer_when() {
let list = TimedVec(VecDeque::from(vec![
MockData(10, "1".to_string()),
MockData(10, "2".to_string()),
MockData(10, "3".to_string()),
MockData(10, "4".to_string()),
]));
// let frame = TimedDecoder::
let out = list.buffer_when(TimedDecoder::new(Duration::from_millis(20)));
let out: Vec<Vec<MockData>> = out.collect();
let expect_out = vec![
vec![MockData(10, "1".to_owned()), MockData(10, "2".to_owned())],
vec![MockData(10, "3".to_owned()), MockData(10, "4".to_owned())],
];
assert_eq!(out, expect_out);
}
}