1: <?php
2: /**
3: * CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
4: * Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org)
5: *
6: * Licensed under The MIT License
7: * For full copyright and license information, please see the LICENSE.txt
8: * Redistributions of files must retain the above copyright notice.
9: *
10: * @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org)
11: * @link https://cakephp.org CakePHP(tm) Project
12: * @license https://opensource.org/licenses/mit-license.php MIT License
13: */
14: namespace Cake\Collection\Iterator;
15:
16: use ArrayIterator;
17: use IteratorAggregate;
18: use LogicException;
19: use Traversable;
20:
21: /**
22: * Implements a simplistic version of the popular Map-Reduce algorithm. Acts
23: * like an iterator for the original passed data after each result has been
24: * processed, thus offering a transparent wrapper for results coming from any
25: * source.
26: */
27: class MapReduce implements IteratorAggregate
28: {
29: /**
30: * Holds the shuffled results that were emitted from the map
31: * phase
32: *
33: * @var array
34: */
35: protected $_intermediate = [];
36:
37: /**
38: * Holds the results as emitted during the reduce phase
39: *
40: * @var array
41: */
42: protected $_result = [];
43:
44: /**
45: * Whether the Map-Reduce routine has been executed already on the data
46: *
47: * @var bool
48: */
49: protected $_executed = false;
50:
51: /**
52: * Holds the original data that needs to be processed
53: *
54: * @var \Traversable|null
55: */
56: protected $_data;
57:
58: /**
59: * A callable that will be executed for each record in the original data
60: *
61: * @var callable
62: */
63: protected $_mapper;
64:
65: /**
66: * A callable that will be executed for each intermediate record emitted during
67: * the Map phase
68: *
69: * @var callable|null
70: */
71: protected $_reducer;
72:
73: /**
74: * Count of elements emitted during the Reduce phase
75: *
76: * @var int
77: */
78: protected $_counter = 0;
79:
80: /**
81: * Constructor
82: *
83: * ### Example:
84: *
85: * Separate all unique odd and even numbers in an array
86: *
87: * ```
88: * $data = new \ArrayObject([1, 2, 3, 4, 5, 3]);
89: * $mapper = function ($value, $key, $mr) {
90: * $type = ($value % 2 === 0) ? 'even' : 'odd';
91: * $mr->emitIntermediate($value, $type);
92: * };
93: *
94: * $reducer = function ($numbers, $type, $mr) {
95: * $mr->emit(array_unique($numbers), $type);
96: * };
97: * $results = new MapReduce($data, $mapper, $reducer);
98: * ```
99: *
100: * Previous example will generate the following result:
101: *
102: * ```
103: * ['odd' => [1, 3, 5], 'even' => [2, 4]]
104: * ```
105: *
106: * @param \Traversable $data the original data to be processed
107: * @param callable $mapper the mapper callback. This function will receive 3 arguments.
108: * The first one is the current value, second the current results key and third is
109: * this class instance so you can call the result emitters.
110: * @param callable|null $reducer the reducer callback. This function will receive 3 arguments.
111: * The first one is the list of values inside a bucket, second one is the name
112: * of the bucket that was created during the mapping phase and third one is an
113: * instance of this class.
114: */
115: public function __construct(Traversable $data, callable $mapper, callable $reducer = null)
116: {
117: $this->_data = $data;
118: $this->_mapper = $mapper;
119: $this->_reducer = $reducer;
120: }
121:
122: /**
123: * Returns an iterator with the end result of running the Map and Reduce
124: * phases on the original data
125: *
126: * @return \ArrayIterator
127: */
128: public function getIterator()
129: {
130: if (!$this->_executed) {
131: $this->_execute();
132: }
133:
134: return new ArrayIterator($this->_result);
135: }
136:
137: /**
138: * Appends a new record to the bucket labelled with $key, usually as a result
139: * of mapping a single record from the original data.
140: *
141: * @param mixed $val The record itself to store in the bucket
142: * @param string $bucket the name of the bucket where to put the record
143: * @return void
144: */
145: public function emitIntermediate($val, $bucket)
146: {
147: $this->_intermediate[$bucket][] = $val;
148: }
149:
150: /**
151: * Appends a new record to the final list of results and optionally assign a key
152: * for this record.
153: *
154: * @param mixed $val The value to be appended to the final list of results
155: * @param string|null $key and optional key to assign to the value
156: * @return void
157: */
158: public function emit($val, $key = null)
159: {
160: $this->_result[$key === null ? $this->_counter : $key] = $val;
161: $this->_counter++;
162: }
163:
164: /**
165: * Runs the actual Map-Reduce algorithm. This is iterate the original data
166: * and call the mapper function for each , then for each intermediate
167: * bucket created during the Map phase call the reduce function.
168: *
169: * @return void
170: * @throws \LogicException if emitIntermediate was called but no reducer function
171: * was provided
172: */
173: protected function _execute()
174: {
175: $mapper = $this->_mapper;
176: foreach ($this->_data as $key => $val) {
177: $mapper($val, $key, $this);
178: }
179: $this->_data = null;
180:
181: if (!empty($this->_intermediate) && empty($this->_reducer)) {
182: throw new LogicException('No reducer function was provided');
183: }
184:
185: /** @var callable $reducer */
186: $reducer = $this->_reducer;
187: foreach ($this->_intermediate as $key => $list) {
188: $reducer($list, $key, $this);
189: }
190: $this->_intermediate = [];
191: $this->_executed = true;
192: }
193: }
194: