|
a |
|
b/singlecellmultiomics/pyutils/handlelimiter.py |
|
|
1 |
# Handle limiter written by Buys de Barbanson, Hubrecht 2017 |
|
|
2 |
# This class allows for writing many files at the same time without the |
|
|
3 |
# hassle of thinking about handle limitations. |
|
|
4 |
import gzip |
|
|
5 |
import time |
|
|
6 |
|
|
|
7 |
|
|
|
8 |
class HandleLimiter(object): |
|
|
9 |
|
|
|
10 |
def __init__(self, maxHandles=32, pruneEvery=10000, compressionLevel=1): |
|
|
11 |
self.openHandles = {} |
|
|
12 |
self.seen = set() # Which files have been opened before |
|
|
13 |
self.maxHandles = maxHandles |
|
|
14 |
self.pruneEvery = pruneEvery |
|
|
15 |
self.pruneIntervalCounter = 0 |
|
|
16 |
self.compressionLevel = compressionLevel |
|
|
17 |
|
|
|
18 |
def write(self, path, string, method=None, forceAppend=False): # 0= plain, 1:gzip |
|
|
19 |
|
|
|
20 |
if path not in self.openHandles: |
|
|
21 |
|
|
|
22 |
self.openHandles[path] = {} |
|
|
23 |
failedOpening = True |
|
|
24 |
while failedOpening: |
|
|
25 |
try: |
|
|
26 |
if path in self.seen or forceAppend: |
|
|
27 |
# Append when we already wrote to the file |
|
|
28 |
if method == 1: |
|
|
29 |
self.openHandles[path]['handle'] = gzip.open( |
|
|
30 |
path, 'ab', self.compressionLevel) |
|
|
31 |
else: |
|
|
32 |
self.openHandles[path]['handle'] = open(path, 'a') |
|
|
33 |
else: |
|
|
34 |
# Open as new file when it is the first write |
|
|
35 |
if method == 1: |
|
|
36 |
self.openHandles[path]['handle'] = gzip.open( |
|
|
37 |
path, 'wb', self.compressionLevel) |
|
|
38 |
else: |
|
|
39 |
self.openHandles[path]['handle'] = open(path, 'w') |
|
|
40 |
# Remember that we accessed this file |
|
|
41 |
self.seen.add(path) |
|
|
42 |
failedOpening = False |
|
|
43 |
except Exception as e: |
|
|
44 |
failedOpening = True |
|
|
45 |
if len(self.openHandles) > 1: |
|
|
46 |
self.close() |
|
|
47 |
else: |
|
|
48 |
# This is mayorly bad... |
|
|
49 |
print( |
|
|
50 |
'Failed writing to %s, even after closing all other open file-handles. Out of options...' % |
|
|
51 |
path) |
|
|
52 |
print(e) |
|
|
53 |
# Raise the error to the parent method |
|
|
54 |
raise |
|
|
55 |
if method == 0: |
|
|
56 |
self.openHandles[path]['handle'].write(string) |
|
|
57 |
else: |
|
|
58 |
self.openHandles[path]['handle'].write(bytes(string, 'UTF-8')) |
|
|
59 |
self.openHandles[path]['lastw'] = time.time() |
|
|
60 |
self.pruneIntervalCounter += 1 |
|
|
61 |
if self.pruneIntervalCounter >= self.pruneEvery: |
|
|
62 |
self.prune() |
|
|
63 |
|
|
|
64 |
def prune(self): |
|
|
65 |
if len(self.openHandles) > self.maxHandles: |
|
|
66 |
toPrune = len(self.openHandles) - self.maxHandles |
|
|
67 |
pathsToPrune = sorted( |
|
|
68 |
self.openHandles.keys(), key=lambda path: ( |
|
|
69 |
self.openHandles[path]['lastw']))[ |
|
|
70 |
:toPrune] # ,reverse=True |
|
|
71 |
for path in pathsToPrune: |
|
|
72 |
if 'handle' in self.openHandles[path]: |
|
|
73 |
try: |
|
|
74 |
self.openHandles[path]['handle'].close() |
|
|
75 |
except Exception as e: |
|
|
76 |
pass |
|
|
77 |
|
|
|
78 |
self.openHandles.pop(path) |
|
|
79 |
self.pruneIntervalCounter = 0 |
|
|
80 |
|
|
|
81 |
def close(self): |
|
|
82 |
|
|
|
83 |
k = self.openHandles.keys() |
|
|
84 |
destroyed = [] |
|
|
85 |
for path in k: |
|
|
86 |
if 'handle' in self.openHandles[path]: |
|
|
87 |
try: |
|
|
88 |
self.openHandles[path]['handle'].close() |
|
|
89 |
except BaseException: |
|
|
90 |
pass |
|
|
91 |
else: |
|
|
92 |
print('Closed broken file handle for %s' % path) |
|
|
93 |
destroyed.append(path) |
|
|
94 |
for delete in destroyed: |
|
|
95 |
self.openHandles.pop(delete) |