OLD | NEW |
1 # Copyright 2015 The Chromium Authors. All rights reserved. | 1 # Copyright 2015 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """A module for storing and getting objects from datastore. | 5 """A module for storing and getting objects from datastore. |
6 | 6 |
7 This module provides Get, Set and Delete functions for storing pickleable | 7 This module provides Get, Set and Delete functions for storing pickleable |
8 objects in datastore, with support for large objects greater than 1 MB. | 8 objects in datastore, with support for large objects greater than 1 MB. |
9 | 9 |
10 Although this module contains ndb.Model classes, these are not intended | 10 Although this module contains ndb.Model classes, these are not intended |
(...skipping 22 matching lines...) Expand all Loading... |
33 # Maximum number of entities and memcache to save a value. | 33 # Maximum number of entities and memcache to save a value. |
34 # The limit for data stored in one datastore entity is 1 MB, | 34 # The limit for data stored in one datastore entity is 1 MB, |
35 # and the limit for memcache batch operations is 32 MB. See: | 35 # and the limit for memcache batch operations is 32 MB. See: |
36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits | 36 # https://cloud.google.com/appengine/docs/python/memcache/#Python_Limits |
37 _MAX_NUM_PARTS = 16 | 37 _MAX_NUM_PARTS = 16 |
38 | 38 |
39 # Max bytes per entity or value cached with memcache. | 39 # Max bytes per entity or value cached with memcache. |
40 _CHUNK_SIZE = 1000 * 1000 | 40 _CHUNK_SIZE = 1000 * 1000 |
41 | 41 |
42 | 42 |
| 43 @ndb.synctasklet |
43 def Get(key): | 44 def Get(key): |
44 """Gets the value. | 45 """Gets the value. |
45 | 46 |
46 Args: | 47 Args: |
47 key: String key value. | 48 key: String key value. |
48 | 49 |
49 Returns: | 50 Returns: |
50 A value for key. | 51 A value for key. |
51 """ | 52 """ |
52 results = MultipartCache.Get(key) | 53 result = yield GetAsync(key) |
53 if not results: | 54 raise ndb.Return(result) |
54 results = _GetValueFromDatastore(key) | |
55 MultipartCache.Set(key, results) | |
56 return results | |
57 | 55 |
58 | 56 |
| 57 @ndb.tasklet |
| 58 def GetAsync(key): |
| 59 results = yield MultipartCache.GetAsync(key) |
| 60 if not results: |
| 61 set_future = MultipartCache.SetAsync(key, results) |
| 62 get_future = _GetValueFromDatastore(key) |
| 63 yield set_future, get_future |
| 64 results = get_future.get_result() |
| 65 raise ndb.Return(results) |
| 66 |
| 67 |
| 68 @ndb.synctasklet |
59 def Set(key, value): | 69 def Set(key, value): |
60 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. | 70 """Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. |
61 | 71 |
62 Args: | 72 Args: |
63 key: String key value. | 73 key: String key value. |
64 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. | 74 value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. |
65 """ | 75 """ |
66 entity = ndb.Key(MultipartEntity, key).get() | 76 yield SetAsync(key, value) |
| 77 |
| 78 |
| 79 @ndb.tasklet |
| 80 def SetAsync(key, value): |
| 81 entity = yield ndb.Key(MultipartEntity, key).get_async() |
67 if not entity: | 82 if not entity: |
68 entity = MultipartEntity(id=key) | 83 entity = MultipartEntity(id=key) |
69 entity.SetData(value) | 84 entity.SetData(value) |
70 entity.Save() | 85 yield (entity.PutAsync(), |
71 MultipartCache.Set(key, value) | 86 MultipartCache.SetAsync(key, value)) |
72 | 87 |
73 | 88 |
| 89 @ndb.synctasklet |
74 def Delete(key): | 90 def Delete(key): |
75 """Deletes the value in datastore and memcache.""" | 91 """Deletes the value in datastore and memcache.""" |
76 ndb.Key(MultipartEntity, key).delete() | 92 yield DeleteAsync(key) |
77 MultipartCache.Delete(key) | 93 |
| 94 |
| 95 @ndb.tasklet |
| 96 def DeleteAsync(key): |
| 97 multipart_entity_key = ndb.Key(MultipartEntity, key) |
| 98 yield (multipart_entity_key.delete_async(), |
| 99 MultipartEntity.DeleteAsync(multipart_entity_key), |
| 100 MultipartCache.DeleteAsync(key)) |
78 | 101 |
79 | 102 |
80 class MultipartEntity(ndb.Model): | 103 class MultipartEntity(ndb.Model): |
81 """Container for PartEntity.""" | 104 """Container for PartEntity.""" |
82 | 105 |
83 # Number of entities use to store serialized. | 106 # Number of entities use to store serialized. |
84 size = ndb.IntegerProperty(default=0, indexed=False) | 107 size = ndb.IntegerProperty(default=0, indexed=False) |
85 | 108 |
86 @classmethod | 109 @ndb.tasklet |
87 def _post_get_hook(cls, key, future): # pylint: disable=unused-argument | 110 def GetPartsAsync(self): |
88 """Deserializes data from multiple PartEntity.""" | 111 """Deserializes data from multiple PartEntity.""" |
89 entity = future.get_result() | 112 if self.size: |
90 if entity is None or not entity.size: | |
91 return | 113 return |
92 | 114 |
93 string_id = entity.key.string_id() | 115 string_id = self.key.string_id() |
94 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) | 116 part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) |
95 for i in xrange(entity.size)] | 117 for i in xrange(entity.size)] |
96 part_entities = ndb.get_multi(part_keys) | 118 part_entities = yield ndb.get_multi_async(part_keys) |
97 serialized = ''.join(p.value for p in part_entities if p is not None) | 119 serialized = ''.join(p.value for p in part_entities if p is not None) |
98 entity.SetData(pickle.loads(serialized)) | 120 self.SetData(pickle.loads(serialized)) |
99 | 121 |
100 @classmethod | 122 @classmethod |
101 def _pre_delete_hook(cls, key): | 123 @ndb.tasklet |
102 """Deletes PartEntity entities.""" | 124 def DeleteAsync(cls, key): |
103 part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True) | 125 part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) |
104 ndb.delete_multi(part_keys) | 126 yield ndb.delete_multi_async(part_keys) |
105 | 127 |
106 def Save(self): | 128 @ndb.tasklet |
| 129 def PutAsync(self): |
107 """Stores serialized data over multiple PartEntity.""" | 130 """Stores serialized data over multiple PartEntity.""" |
108 serialized_parts = _Serialize(self.GetData()) | 131 serialized_parts = _Serialize(self.GetData()) |
109 if len(serialized_parts) > _MAX_NUM_PARTS: | 132 if len(serialized_parts) > _MAX_NUM_PARTS: |
110 logging.error('Max number of parts reached.') | 133 logging.error('Max number of parts reached.') |
111 return | 134 return |
112 part_list = [] | 135 part_list = [] |
113 num_parts = len(serialized_parts) | 136 num_parts = len(serialized_parts) |
114 for i in xrange(num_parts): | 137 for i in xrange(num_parts): |
115 if serialized_parts[i] is not None: | 138 if serialized_parts[i] is not None: |
116 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) | 139 part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
117 part_list.append(part) | 140 part_list.append(part) |
118 self.size = num_parts | 141 self.size = num_parts |
119 ndb.put_multi(part_list + [self]) | 142 yield ndb.put_multi_async(part_list + [self]) |
120 | 143 |
121 def GetData(self): | 144 def GetData(self): |
122 return getattr(self, '_data', None) | 145 return getattr(self, '_data', None) |
123 | 146 |
124 def SetData(self, data): | 147 def SetData(self, data): |
125 setattr(self, '_data', data) | 148 setattr(self, '_data', data) |
126 | 149 |
127 | 150 |
128 class PartEntity(ndb.Model): | 151 class PartEntity(ndb.Model): |
129 """Holds a part of serialized data for MultipartEntity. | 152 """Holds a part of serialized data for MultipartEntity. |
130 | 153 |
131 This entity key has the form: | 154 This entity key has the form: |
132 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) | 155 ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index) |
133 """ | 156 """ |
134 value = ndb.BlobProperty() | 157 value = ndb.BlobProperty() |
135 | 158 |
136 | 159 |
137 class MultipartCache(object): | 160 class MultipartCache(object): |
138 """Contains operations for storing values over multiple memcache keys. | 161 """Contains operations for storing values over multiple memcache keys. |
139 | 162 |
140 Values are serialized, split, and stored over multiple memcache keys. The | 163 Values are serialized, split, and stored over multiple memcache keys. The |
141 head cache stores the expected size. | 164 head cache stores the expected size. |
142 """ | 165 """ |
143 | 166 |
144 @classmethod | 167 @classmethod |
145 def Get(cls, key): | 168 @ndb.tasklet |
| 169 def GetAsync(cls, key): |
146 """Gets value in memcache.""" | 170 """Gets value in memcache.""" |
147 keys = cls._GetCacheKeyList(key) | 171 keys = cls._GetCacheKeyList(key) |
148 head_key = cls._GetCacheKey(key) | 172 head_key = cls._GetCacheKey(key) |
149 cache_values = memcache.get_multi(keys) | 173 client = memcache.Client() |
| 174 cache_values = yield client.get_multi_async(keys) |
150 # Whether we have all the memcache values. | 175 # Whether we have all the memcache values. |
151 if len(keys) != len(cache_values) or head_key not in cache_values: | 176 if len(keys) != len(cache_values) or head_key not in cache_values: |
152 return None | 177 raise ndb.Return(None) |
153 | 178 |
154 serialized = '' | 179 serialized = '' |
155 cache_size = cache_values[head_key] | 180 cache_size = cache_values[head_key] |
156 keys.remove(head_key) | 181 keys.remove(head_key) |
157 for key in keys[:cache_size]: | 182 for key in keys[:cache_size]: |
158 if key not in cache_values: | 183 if key not in cache_values: |
159 return None | 184 raise ndb.Return(None) |
160 if cache_values[key] is not None: | 185 if cache_values[key] is not None: |
161 serialized += cache_values[key] | 186 serialized += cache_values[key] |
162 return pickle.loads(serialized) | 187 raise ndb.Return(pickle.loads(serialized)) |
163 | 188 |
164 @classmethod | 189 @classmethod |
165 def Set(cls, key, value): | 190 @ndb.tasklet |
| 191 def SetAsync(cls, key, value): |
166 """Sets a value in memcache.""" | 192 """Sets a value in memcache.""" |
167 serialized_parts = _Serialize(value) | 193 serialized_parts = _Serialize(value) |
168 if len(serialized_parts) > _MAX_NUM_PARTS: | 194 if len(serialized_parts) > _MAX_NUM_PARTS: |
169 logging.error('Max number of parts reached.') | 195 logging.error('Max number of parts reached.') |
170 return | 196 raise ndb.Return(None) |
171 | 197 |
172 cached_values = {} | 198 cached_values = {} |
173 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) | 199 cached_values[cls._GetCacheKey(key)] = len(serialized_parts) |
174 for i in xrange(len(serialized_parts)): | 200 for i in xrange(len(serialized_parts)): |
175 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] | 201 cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] |
176 memcache.set_multi(cached_values) | 202 client = memcache.Client() |
| 203 yield client.set_multi_async(cached_values) |
177 | 204 |
178 @classmethod | 205 @classmethod |
| 206 @ndb.synctasklet |
179 def Delete(cls, key): | 207 def Delete(cls, key): |
180 """Deletes all cached values for key.""" | 208 """Deletes all cached values for key.""" |
181 memcache.delete_multi(cls._GetCacheKeyList(key)) | 209 yield cls.DeleteAsync(key) |
| 210 |
| 211 @classmethod |
| 212 @ndb.tasklet |
| 213 def DeleteAsync(cls, key): |
| 214 client = memcache.Client() |
| 215 yield client.delete_multi_async(cls._GetCacheKeyList(key)) |
182 | 216 |
183 @classmethod | 217 @classmethod |
184 def _GetCacheKeyList(cls, key): | 218 def _GetCacheKeyList(cls, key): |
185 """Gets a list of head cache key and cache key parts.""" | 219 """Gets a list of head cache key and cache key parts.""" |
186 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] | 220 keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)] |
187 keys.append(cls._GetCacheKey(key)) | 221 keys.append(cls._GetCacheKey(key)) |
188 return keys | 222 return keys |
189 | 223 |
190 @classmethod | 224 @classmethod |
191 def _GetCacheKey(cls, key, index=None): | 225 def _GetCacheKey(cls, key, index=None): |
192 """Returns either head cache key or cache key part.""" | 226 """Returns either head cache key or cache key part.""" |
193 if index is not None: | 227 if index is not None: |
194 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) | 228 return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index) |
195 return _MULTIPART_ENTITY_MEMCACHE_KEY + key | 229 return _MULTIPART_ENTITY_MEMCACHE_KEY + key |
196 | 230 |
197 | 231 |
| 232 @ndb.tasklet |
198 def _GetValueFromDatastore(key): | 233 def _GetValueFromDatastore(key): |
199 entity = ndb.Key(MultipartEntity, key).get() | 234 entity = yield ndb.Key(MultipartEntity, key).get_async() |
200 if not entity: | 235 if not entity: |
201 return None | 236 raise ndb.Return(None) |
202 return entity.GetData() | 237 yield entity.GetPartsAsync() |
| 238 raise ndb.Return(entity.GetData()) |
203 | 239 |
204 | 240 |
205 def _Serialize(value): | 241 def _Serialize(value): |
206 """Serializes value and returns a list of its parts. | 242 """Serializes value and returns a list of its parts. |
207 | 243 |
208 Args: | 244 Args: |
209 value: A pickleable value. | 245 value: A pickleable value. |
210 | 246 |
211 Returns: | 247 Returns: |
212 A list of string representation of the value that has been pickled and split | 248 A list of string representation of the value that has been pickled and split |
213 into _CHUNK_SIZE. | 249 into _CHUNK_SIZE. |
214 """ | 250 """ |
215 serialized = pickle.dumps(value, 2) | 251 serialized = pickle.dumps(value, 2) |
216 length = len(serialized) | 252 length = len(serialized) |
217 values = [] | 253 values = [] |
218 for i in xrange(0, length, _CHUNK_SIZE): | 254 for i in xrange(0, length, _CHUNK_SIZE): |
219 values.append(serialized[i:i + _CHUNK_SIZE]) | 255 values.append(serialized[i:i + _CHUNK_SIZE]) |
220 for i in xrange(len(values), _MAX_NUM_PARTS): | 256 for i in xrange(len(values), _MAX_NUM_PARTS): |
221 values.append(None) | 257 values.append(None) |
222 return values | 258 return values |
OLD | NEW |