Index: dashboard/dashboard/common/stored_object.py |
diff --git a/dashboard/dashboard/common/stored_object.py b/dashboard/dashboard/common/stored_object.py |
index d0f8b96f31ce6c6906560068b6b37cd6605a8d36..d903134a2d5a47073427c44193e1f5ad49e5fbc2 100644 |
--- a/dashboard/dashboard/common/stored_object.py |
+++ b/dashboard/dashboard/common/stored_object.py |
@@ -40,6 +40,7 @@ _MAX_NUM_PARTS = 16 |
_CHUNK_SIZE = 1000 * 1000 |
+@ndb.synctasklet |
def Get(key): |
"""Gets the value. |
@@ -49,13 +50,22 @@ def Get(key): |
Returns: |
A value for key. |
""" |
- results = MultipartCache.Get(key) |
+ result = yield GetAsync(key) |
+ raise ndb.Return(result) |
+ |
+ |
+@ndb.tasklet |
+def GetAsync(key): |
+ results = yield MultipartCache.GetAsync(key) |
if not results: |
- results = _GetValueFromDatastore(key) |
- MultipartCache.Set(key, results) |
- return results |
+ set_future = MultipartCache.SetAsync(key, results) |
+ get_future = _GetValueFromDatastore(key) |
+ yield set_future, get_future |
+ results = get_future.get_result() |
+ raise ndb.Return(results) |
+@ndb.synctasklet |
def Set(key, value): |
"""Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB. |
@@ -63,18 +73,31 @@ def Set(key, value): |
key: String key value. |
value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB. |
""" |
- entity = ndb.Key(MultipartEntity, key).get() |
+ yield SetAsync(key, value) |
+ |
+ |
+@ndb.tasklet |
+def SetAsync(key, value): |
+ entity = yield ndb.Key(MultipartEntity, key).get_async() |
if not entity: |
entity = MultipartEntity(id=key) |
entity.SetData(value) |
- entity.Save() |
- MultipartCache.Set(key, value) |
+ yield (entity.PutAsync(), |
+ MultipartCache.SetAsync(key, value)) |
+@ndb.synctasklet |
def Delete(key): |
"""Deletes the value in datastore and memcache.""" |
- ndb.Key(MultipartEntity, key).delete() |
- MultipartCache.Delete(key) |
+ yield DeleteAsync(key) |
+ |
+ |
+@ndb.tasklet |
+def DeleteAsync(key): |
+ multipart_entity_key = ndb.Key(MultipartEntity, key) |
+ yield (multipart_entity_key.delete_async(), |
+ MultipartEntity.DeleteAsync(multipart_entity_key), |
+ MultipartCache.DeleteAsync(key)) |
class MultipartEntity(ndb.Model): |
@@ -83,27 +106,27 @@ class MultipartEntity(ndb.Model): |
# Number of entities use to store serialized. |
size = ndb.IntegerProperty(default=0, indexed=False) |
- @classmethod |
- def _post_get_hook(cls, key, future): # pylint: disable=unused-argument |
+ @ndb.tasklet |
+ def GetPartsAsync(self): |
"""Deserializes data from multiple PartEntity.""" |
- entity = future.get_result() |
- if entity is None or not entity.size: |
+ if self.size: |
return |
- string_id = entity.key.string_id() |
+ string_id = self.key.string_id() |
part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1) |
for i in xrange(entity.size)] |
- part_entities = ndb.get_multi(part_keys) |
+ part_entities = yield ndb.get_multi_async(part_keys) |
serialized = ''.join(p.value for p in part_entities if p is not None) |
- entity.SetData(pickle.loads(serialized)) |
+ self.SetData(pickle.loads(serialized)) |
@classmethod |
- def _pre_delete_hook(cls, key): |
- """Deletes PartEntity entities.""" |
- part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True) |
- ndb.delete_multi(part_keys) |
+ @ndb.tasklet |
+ def DeleteAsync(cls, key): |
+ part_keys = yield PartEntity.query(ancestor=key).fetch_async(keys_only=True) |
+ yield ndb.delete_multi_async(part_keys) |
- def Save(self): |
+ @ndb.tasklet |
+ def PutAsync(self): |
"""Stores serialized data over multiple PartEntity.""" |
serialized_parts = _Serialize(self.GetData()) |
if len(serialized_parts) > _MAX_NUM_PARTS: |
@@ -116,7 +139,7 @@ class MultipartEntity(ndb.Model): |
part = PartEntity(id=i + 1, parent=self.key, value=serialized_parts[i]) |
part_list.append(part) |
self.size = num_parts |
- ndb.put_multi(part_list + [self]) |
+ yield ndb.put_multi_async(part_list + [self]) |
def GetData(self): |
return getattr(self, '_data', None) |
@@ -142,43 +165,54 @@ class MultipartCache(object): |
""" |
@classmethod |
- def Get(cls, key): |
+ @ndb.tasklet |
+ def GetAsync(cls, key): |
"""Gets value in memcache.""" |
keys = cls._GetCacheKeyList(key) |
head_key = cls._GetCacheKey(key) |
- cache_values = memcache.get_multi(keys) |
+ client = memcache.Client() |
+ cache_values = yield client.get_multi_async(keys) |
# Whether we have all the memcache values. |
if len(keys) != len(cache_values) or head_key not in cache_values: |
- return None |
+ raise ndb.Return(None) |
serialized = '' |
cache_size = cache_values[head_key] |
keys.remove(head_key) |
for key in keys[:cache_size]: |
if key not in cache_values: |
- return None |
+ raise ndb.Return(None) |
if cache_values[key] is not None: |
serialized += cache_values[key] |
- return pickle.loads(serialized) |
+ raise ndb.Return(pickle.loads(serialized)) |
@classmethod |
- def Set(cls, key, value): |
+ @ndb.tasklet |
+ def SetAsync(cls, key, value): |
"""Sets a value in memcache.""" |
serialized_parts = _Serialize(value) |
if len(serialized_parts) > _MAX_NUM_PARTS: |
logging.error('Max number of parts reached.') |
- return |
+ raise ndb.Return(None) |
cached_values = {} |
cached_values[cls._GetCacheKey(key)] = len(serialized_parts) |
for i in xrange(len(serialized_parts)): |
cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i] |
- memcache.set_multi(cached_values) |
+ client = memcache.Client() |
+ yield client.set_multi_async(cached_values) |
@classmethod |
+ @ndb.synctasklet |
def Delete(cls, key): |
"""Deletes all cached values for key.""" |
- memcache.delete_multi(cls._GetCacheKeyList(key)) |
+ yield cls.DeleteAsync(key) |
+ |
+ @classmethod |
+ @ndb.tasklet |
+ def DeleteAsync(cls, key): |
+ client = memcache.Client() |
+ yield client.delete_multi_async(cls._GetCacheKeyList(key)) |
@classmethod |
def _GetCacheKeyList(cls, key): |
@@ -195,11 +229,13 @@ class MultipartCache(object): |
return _MULTIPART_ENTITY_MEMCACHE_KEY + key |
+@ndb.tasklet |
def _GetValueFromDatastore(key): |
- entity = ndb.Key(MultipartEntity, key).get() |
+ entity = yield ndb.Key(MultipartEntity, key).get_async() |
if not entity: |
- return None |
- return entity.GetData() |
+ raise ndb.Return(None) |
+ yield entity.GetPartsAsync() |
+ raise ndb.Return(entity.GetData()) |
def _Serialize(value): |