#!/usr/bin/env python # # zabuton is copyright 2009 by Yang Zhao # This file is distributed under terms of Apache License, Version 2.0 # For full text of the license, see http://www.apache.org/licenses/LICENSE-2.0 import __builtin__ import json import os import sys import traceback from types import FunctionType def _error(id, fmt, *args): return {'error': id, 'reason': fmt % args} class ViewServer: def __init__(self, logger=lambda x: None): self._functions = [] self._map_results = [] self._log = logger def _emit(self, key, value): self._map_results[-1].append((key, value)) def reset(self): del self._functions[:] return True def add_fun(self, func_def): try: code_blob = compile(func_def, '', 'exec') except: return _error('prepare_error', "Error preparing map function:\n%s", traceback.format_exc()) def map_function(func): func._couch_map = True return func g = {'map_function': map_function} l = {} try: exec code_blob in g, l except: return _error('initialization_error', "Error initalizing map function:\n%s", traceback.format_exc()) functions = [x for x in l.itervalues() if type(x) == FunctionType] map_functions = [f for f in functions if f.func_dict.get('_couch_map', False)] if len(map_functions) == 1: function = map_functions[0] elif len(map_functions) > 1: return _error("ambiguous_function_def", "%d functions decorated as mappers; only 1 allowed", len(map_functions)) elif len(functions) > 1: return _error("ambiguous_function_def", "%d functions defined, but none was assigned as mapper", len(functions)) elif not functions: return _error("no_function_def", "No function definition found") else: function = functions[0] l.update(__builtin__.__dict__) if g['__builtins__']: l['__builtins__'] = g['__builtins__'] l['emit'] = self._emit l['log'] = self._log self._functions.append(FunctionType(function.func_code, l)) return True def map_doc(self, doc): del self._map_results[:] for func in self._functions: self._map_results.append([]) try: func(doc) except: return _error("runtime_error", "Runtime error:\n%s", traceback.format_exc()) return self._map_results def _run_reduce(self, code, values, rereduce): '''Extract reduce functions from code string Returns a tuple of (func, global_variables) func will be a generic reduce function of signature: function([value, ...], rereduce) if rereduce is True, None is substituded for [key,id] On error, return (None, error_data) ''' def decorate(type): def f(func): func._couch_reduce = type return func return f def reduce_error(msg, fmt, *args): return (False, _error(msg, fmt, *args)) g = {'reduce_function': decorate('reduce'), 'rereduce_function': decorate('rereduce'), 'allreduce_function': decorate('allreduce')} l = {} try: code_blob = compile(code, '', 'exec') exec code_blob in g, l except: return _error('prepare_error', "Error compiling reduce function:\n%s", traceback.format_exc()) functions = [x for x in l.itervalues() if __builtin__.type(x) == FunctionType] reduce_functions = [f for f in functions if '_couch_reduce' in f.func_dict] l.update(__builtin__.__dict__) l['log'] = self._log if not reduce_functions: if len(functions) == 1: allreduce_fn = functions[0] elif functions: return _error('ambiguous_function_def', "%d functions defined, none marked as reducer", len(functions)) else: funcs = {'reduce': [], 'rereduce': [], 'allreduce': []} for f in reduce_functions: type = f._couch_reduce funcs[type].append(f) if reduce(lambda a,x: a or len(x) > 2, funcs.itervalues(), False): return _error('ambiguous_function_def', "Multiple mapper functions of the same type defined") if len(funcs['allreduce']): if len(reduce_functions) > 1: return _error('ambiguous_function_def', "Generic reduce function defined when specific ones present") allreduce_fn = funcs['allreduce'][0] else: if not (funcs['reduce'] and funcs['rereduce']): return _error('bad_function_def', "Not all required functions defined") def allreduce(vals, rereduce): if rereduce: return _rereduce(vals) else: return _reduce(vals) allreduce_fn = allreduce l['_reduce'] = funcs['reduce'][0] l['_rereduce'] = funcs['rereduce'][0] allreduce_fn = FunctionType(allreduce_fn.func_code, l) try: retval = allreduce_fn(values, rereduce) except: return _error("runtime_error", "Runtime error:\n%s", traceback.format_exc()) else: return (True, [retval]) def reduce(self, code, values): return self._run_reduce(code[0], values, False) def rereduce(self, code, values): return self._run_reduce(code[0], values, True) @classmethod def run(cls, logger=None, input=sys.stdin, output=sys.stdout): def _log(message): json.dump(['log', str(message)], output, separators=(',', ':')) output.write('\n') output.flush() log = logger if logger else _log server = cls(log) try: while True: line = input.readline().rstrip() if not line: break try: cmd = json.loads(line) except ValueError, e: log("JSON Error: %s" % (str(e),)) continue retval = getattr(server, cmd[0])(*cmd[1:]) json.dump(retval, output, separators=(',', ':')) output.write('\n') output.flush() except KeyboardInterrupt: return 0 if __name__ == '__main__': sys.exit(ViewServer.run())