From: wiso on 11 Jan 2010 18:15 I'm using a class to read some data from files: import multiprocessing from collections import defaultdict def SingleContainer(): return list() class Container(defaultdict): """ this class store odd line in self["odd"] and even line in self["even"]. It is stupid, but it's only an example. """ def __init__(self,file_name): if type(file_name) != str: raise AttributeError, "%s is not a string" % file_name defaultdict.__init__(self,SingleContainer) self.file_name = file_name self.readen_lines = 0 def read(self): f = open(self.file_name) print "start reading file %s" % self.file_name for line in f: self.readen_lines += 1 values = line.split() key = {0: "even", 1: "odd"}[self.readen_lines %2] self[key].append(values) print "readen %d lines from file %s" % (self.readen_lines, self.file_name) """ Now I want to read more than one file per times """ def do(file_name): container = Container(file_name) container.read() return container if __name__ == "__main__": file_names = ["prova_200909.log", "prova_200910.log"] pool = multiprocessing.Pool(len(file_names)) result = pool.map(do,file_names) pool.close() pool.join() print "Finish" but I got: start reading file prova_200909.log start reading file prova_200910.log readen 142 lines from file prova_200909.log readen 160 lines from file prova_200910.log Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.6/threading.py", line 522, in __bootstrap_inner self.run() File "/usr/lib64/python2.6/threading.py", line 477, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in _handle_results task = get() File "main2.py", line 11, in __init__ raise AttributeError, "%s is not a string" % file_name AttributeError: (AttributeError('<function SingleContainer at 0x7f08b253d938> is not a string',), <class '__main__.Container'>, (<function SingleContainer at 0x7f08b253d938>,)) the problem is when pool share objects, but why is it calling Container.__init__ with a Container parameter?
From: Robert Kern on 11 Jan 2010 18:31 On 2010-01-11 17:15 PM, wiso wrote: > I'm using a class to read some data from files: > > import multiprocessing > from collections import defaultdict > > def SingleContainer(): > return list() > > > class Container(defaultdict): > """ > this class store odd line in self["odd"] and even line in self["even"]. > It is stupid, but it's only an example. > """ > def __init__(self,file_name): > if type(file_name) != str: > raise AttributeError, "%s is not a string" % file_name > defaultdict.__init__(self,SingleContainer) > self.file_name = file_name > self.readen_lines = 0 > def read(self): > f = open(self.file_name) > print "start reading file %s" % self.file_name > for line in f: > self.readen_lines += 1 > values = line.split() > key = {0: "even", 1: "odd"}[self.readen_lines %2] > self[key].append(values) > print "readen %d lines from file %s" % (self.readen_lines, > self.file_name) > > """ > Now I want to read more than one file per times > """ > > def do(file_name): > container = Container(file_name) > container.read() > return container > > if __name__ == "__main__": > file_names = ["prova_200909.log", "prova_200910.log"] > pool = multiprocessing.Pool(len(file_names)) > result = pool.map(do,file_names) > pool.close() > pool.join() > print "Finish" > > > > but I got: > start reading file prova_200909.log > start reading file prova_200910.log > readen 142 lines from file prova_200909.log > readen 160 lines from file prova_200910.log > Exception in thread Thread-2: > Traceback (most recent call last): > File "/usr/lib64/python2.6/threading.py", line 522, in __bootstrap_inner > self.run() > File "/usr/lib64/python2.6/threading.py", line 477, in run > self.__target(*self.__args, **self.__kwargs) > File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in > _handle_results > task = get() > File "main2.py", line 11, in __init__ > raise AttributeError, "%s is not a string" % file_name > AttributeError: (AttributeError('<function SingleContainer at > 0x7f08b253d938> is not a string',),<class '__main__.Container'>, (<function > SingleContainer at 0x7f08b253d938>,)) > > > the problem is when pool share objects, but why is it calling > Container.__init__ with a Container parameter? When you return the container from do() in the worker process, it must be pickled in order to be sent over the socket. You did not override the implementation of the .__reduce_ex__() method, so it used defaultdict's version which passes the factory function as the first argument to the constructor. I would recommend passing back the container.items() list instead of a Container instance as the easiest path forward. -- Robert Kern "I have come to believe that the whole world is an enigma, a harmless enigma that is made terrible by our own mad attempt to interpret it as though it had an underlying truth." -- Umberto Eco
From: wiso on 11 Jan 2010 18:50 Robert Kern wrote: > On 2010-01-11 17:15 PM, wiso wrote: >> I'm using a class to read some data from files: >> >> import multiprocessing >> from collections import defaultdict >> >> def SingleContainer(): >> return list() >> >> >> class Container(defaultdict): >> """ >> this class store odd line in self["odd"] and even line in >> self["even"]. It is stupid, but it's only an example. >> """ >> def __init__(self,file_name): >> if type(file_name) != str: >> raise AttributeError, "%s is not a string" % file_name >> defaultdict.__init__(self,SingleContainer) >> self.file_name = file_name >> self.readen_lines = 0 >> def read(self): >> f = open(self.file_name) >> print "start reading file %s" % self.file_name >> for line in f: >> self.readen_lines += 1 >> values = line.split() >> key = {0: "even", 1: "odd"}[self.readen_lines %2] >> self[key].append(values) >> print "readen %d lines from file %s" % (self.readen_lines, >> self.file_name) >> >> """ >> Now I want to read more than one file per times >> """ >> >> def do(file_name): >> container = Container(file_name) >> container.read() >> return container >> >> if __name__ == "__main__": >> file_names = ["prova_200909.log", "prova_200910.log"] >> pool = multiprocessing.Pool(len(file_names)) >> result = pool.map(do,file_names) >> pool.close() >> pool.join() >> print "Finish" >> >> >> >> but I got: >> start reading file prova_200909.log >> start reading file prova_200910.log >> readen 142 lines from file prova_200909.log >> readen 160 lines from file prova_200910.log >> Exception in thread Thread-2: >> Traceback (most recent call last): >> File "/usr/lib64/python2.6/threading.py", line 522, in >> __bootstrap_inner >> self.run() >> File "/usr/lib64/python2.6/threading.py", line 477, in run >> self.__target(*self.__args, **self.__kwargs) >> File "/usr/lib64/python2.6/multiprocessing/pool.py", line 259, in >> _handle_results >> task = get() >> File "main2.py", line 11, in __init__ >> raise AttributeError, "%s is not a string" % file_name >> AttributeError: (AttributeError('<function SingleContainer at >> 0x7f08b253d938> is not a string',),<class '__main__.Container'>, >> (<function SingleContainer at 0x7f08b253d938>,)) >> >> >> the problem is when pool share objects, but why is it calling >> Container.__init__ with a Container parameter? > > When you return the container from do() in the worker process, it must be > pickled in order to be sent over the socket. You did not override the > implementation of the .__reduce_ex__() method, so it used defaultdict's > version which passes the factory function as the first argument to the > constructor. > > I would recommend passing back the container.items() list instead of a > Container instance as the easiest path forward. > Thank you very much, I change from return container to return container.items() and it works: start reading file prova_200909.log readen 142 lines from file prova_200909.log start reading file prova_200910.log readen 160 lines from file prova_200910.log Finish The problem now is this: start reading file r1_200909.log start reading file r1_200910.log readen 488832 lines from file r1_200910.log readen 517247 lines from file r1_200909.log with huge file (the real case) the program freeze. Is there a solution to avoid pickling/serialization, ... for example something like this: if __name__ == "__main__": file_names = ["r1_200909.log", "r1_200910.log"] pool = multiprocessing.Pool(len(file_names)) childrens = [Container(f) for f in file_names] pool.map(lambda c: c.read(), childrens) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
From: Robert Kern on 11 Jan 2010 19:15 On 2010-01-11 17:50 PM, wiso wrote: > The problem now is this: > start reading file r1_200909.log > start reading file r1_200910.log > readen 488832 lines from file r1_200910.log > readen 517247 lines from file r1_200909.log > > with huge file (the real case) the program freeze. Is there a solution to > avoid pickling/serialization, ... for example something like this: > > if __name__ == "__main__": > file_names = ["r1_200909.log", "r1_200910.log"] > pool = multiprocessing.Pool(len(file_names)) > childrens = [Container(f) for f in file_names] > pool.map(lambda c: c.read(), childrens) > > PicklingError: Can't pickle<type 'function'>: attribute lookup > __builtin__.function failed You can't pickle lambda functions. What information do you actually need back from the workers? -- Robert Kern "I have come to believe that the whole world is an enigma, a harmless enigma that is made terrible by our own mad attempt to interpret it as though it had an underlying truth." -- Umberto Eco
From: wiso on 12 Jan 2010 05:48 Robert Kern wrote: > On 2010-01-11 17:50 PM, wiso wrote: > >> The problem now is this: >> start reading file r1_200909.log >> start reading file r1_200910.log >> readen 488832 lines from file r1_200910.log >> readen 517247 lines from file r1_200909.log >> >> with huge file (the real case) the program freeze. Is there a solution to >> avoid pickling/serialization, ... for example something like this: >> >> if __name__ == "__main__": >> file_names = ["r1_200909.log", "r1_200910.log"] >> pool = multiprocessing.Pool(len(file_names)) >> childrens = [Container(f) for f in file_names] >> pool.map(lambda c: c.read(), childrens) >> >> PicklingError: Can't pickle<type 'function'>: attribute lookup >> __builtin__.function failed > > You can't pickle lambda functions. > > What information do you actually need back from the workers? > They sent back the object filled with data. The problem is very simple: I have a container, the container has a method read(file_name) that read a huge file and fill the container with datas. I have more then 1 file to read so I want to parallelize this process. The reading method is quite slow because it involves regex.
|
Next
|
Last
Pages: 1 2 Prev: Fundamental Function Question (beginner) Next: decode(..., errors='ignore') has no effect |