Commit 2c65f620 authored by Yusuke Iwaki's avatar Yusuke Iwaki

file upload via GridFS

parent 214d8bc1
......@@ -2,6 +2,7 @@ package chat.rocket.android.api;
import android.content.Context;
import bolts.Task;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.realm_helper.RealmHelper;
import org.json.JSONArray;
import org.json.JSONObject;
......@@ -30,11 +31,30 @@ public class FileUploadingHelper extends MethodCallHelper {
.onSuccessTask(CONVERT_TO_JSON_OBJECT);
}
public Task<JSONObject> sendFileMessage(String roomId, String repository, JSONObject fileObj) {
public Task<JSONObject> sendFileMessage(String roomId, String storageType, JSONObject fileObj) {
return call("sendFileMessage", TIMEOUT_MS, () -> new JSONArray()
.put(roomId)
.put(repository)
.put(TextUtils.isEmpty(storageType) ? JSONObject.NULL : storageType)
.put(fileObj))
.onSuccessTask(CONVERT_TO_JSON_OBJECT);
}
public Task<JSONObject> ufsCreate(String filename, long filesize, String mimeType,
String roomId) {
return call("ufsCreate", TIMEOUT_MS, () -> new JSONArray().put(new JSONObject()
.put("name", filename)
.put("size", filesize)
.put("type", mimeType)
.put("store", "rocketchat_uploads")
.put("rid", roomId)
)).onSuccessTask(CONVERT_TO_JSON_OBJECT);
}
public Task<JSONObject> ufsComplete(String fileId, String token) {
return call("ufsComplete", TIMEOUT_MS, () -> new JSONArray()
.put(fileId)
.put("rocketchat_uploads")
.put(token)
).onSuccessTask(CONVERT_TO_JSON_OBJECT);
}
}
......@@ -20,7 +20,7 @@ import chat.rocket.android.helper.LoadMoreScrollListener;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.helper.OnBackPressListener;
import chat.rocket.android.helper.TextUtils;
import chat.rocket.android.layouthelper.chatroom.FileUploadHelper;
import chat.rocket.android.helper.FileUploadHelper;
import chat.rocket.android.layouthelper.chatroom.MessageComposerManager;
import chat.rocket.android.layouthelper.chatroom.MessageListAdapter;
import chat.rocket.android.layouthelper.chatroom.PairedMessage;
......@@ -241,8 +241,8 @@ public class RoomFragment extends AbstractChatRoomFragment
return;
}
String uplId = new FileUploadHelper(getContext(), realmHelper, roomId)
.requestUploading(data.getData());
String uplId = new FileUploadHelper(getContext(), realmHelper)
.requestUploading(roomId, data.getData());
if (!TextUtils.isEmpty(uplId)) {
FileUploadProgressDialogFragment.create(serverConfigId, roomId, uplId)
.show(getFragmentManager(), FileUploadProgressDialogFragment.class.getSimpleName());
......
package chat.rocket.android.layouthelper.chatroom;
package chat.rocket.android.helper;
import android.content.ContentResolver;
import android.content.Context;
......@@ -8,9 +8,9 @@ import android.os.ParcelFileDescriptor;
import android.provider.OpenableColumns;
import android.support.annotation.Nullable;
import android.webkit.MimeTypeMap;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.log.RCLog;
import chat.rocket.android.model.SyncState;
import chat.rocket.android.model.ddp.PublicSetting;
import chat.rocket.android.model.internal.FileUploading;
import chat.rocket.android.realm_helper.RealmHelper;
import java.io.FileNotFoundException;
......@@ -25,43 +25,45 @@ public class FileUploadHelper {
private final Context context;
private final RealmHelper realmHelper;
private final String roomId;
public FileUploadHelper(Context context, RealmHelper realmHelper, String roomId) {
public FileUploadHelper(Context context, RealmHelper realmHelper) {
this.context = context;
this.realmHelper = realmHelper;
this.roomId = roomId;
}
/**
* requestUploading file.
* returns id for observing progress.
*/
public @Nullable String requestUploading(Uri uri) {
public @Nullable String requestUploading(String roomId, Uri uri) {
try (Cursor cursor = context.getContentResolver().query(uri, null, null, null, null)) {
if (cursor != null && cursor.moveToFirst()) {
String filename = cursor.getString(cursor.getColumnIndex(OpenableColumns.DISPLAY_NAME));
long filesize = cursor.getLong(cursor.getColumnIndex(OpenableColumns.SIZE));
String mimeType = context.getContentResolver().getType(uri);
return insertRequestRecord(uri, filename, filesize, mimeType);
return insertRequestRecord(roomId, uri, filename, filesize, mimeType);
} else if (ContentResolver.SCHEME_FILE.equals(uri.getScheme())) {
String filename = uri.getLastPathSegment();
long filesize = detectFileSizeFor(uri);
String mimeType = MimeTypeMap.getSingleton()
.getMimeTypeFromExtension(MimeTypeMap.getFileExtensionFromUrl(uri.toString()));
return insertRequestRecord(uri, filename, filesize, mimeType);
return insertRequestRecord(roomId, uri, filename, filesize, mimeType);
}
}
return null;
}
private String insertRequestRecord(Uri uri, String filename, long filesize, String mimeType) {
private String insertRequestRecord(String roomId,
Uri uri, String filename, long filesize, String mimeType) {
final String uplId = UUID.randomUUID().toString();
final String storageType =
PublicSetting.getString(realmHelper, "FileUpload_Storage_Type", null);
realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(FileUploading.class, new JSONObject()
.put("uplId", uplId)
.put("syncstate", SyncState.NOT_SYNCED)
.put("repository", FileUploading.TO_S3) //TODO: should check public-settings.
.put("storageType", TextUtils.isEmpty(storageType) ? JSONObject.NULL : storageType)
.put("uri", uri.toString())
.put("filename", filename)
.put("filesize", filesize)
......
package chat.rocket.android.model.ddp;
import android.support.annotation.Nullable;
import chat.rocket.android.realm_helper.RealmHelper;
import io.realm.RealmObject;
import io.realm.annotations.PrimaryKey;
import org.json.JSONException;
......@@ -18,6 +20,54 @@ public class PublicSetting extends RealmObject {
private long _updatedAt;
private String meta; //JSON
public String get_id() {
return _id;
}
public void set_id(String _id) {
this._id = _id;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public long get_updatedAt() {
return _updatedAt;
}
public void set_updatedAt(long _updatedAt) {
this._updatedAt = _updatedAt;
}
public String getMeta() {
return meta;
}
public void setMeta(String meta) {
this.meta = meta;
}
public static JSONObject customizeJson(JSONObject settingJson) throws JSONException {
if (!settingJson.isNull("_updatedAt")) {
long updatedAt = settingJson.getJSONObject("_updatedAt").getLong("$date");
......@@ -27,4 +77,27 @@ public class PublicSetting extends RealmObject {
return settingJson;
}
private static @Nullable PublicSetting get(RealmHelper realmHelper, String _id) {
return realmHelper.executeTransactionForRead(realm ->
realm.where(PublicSetting.class).equalTo("_id", _id).findFirst());
}
public static @Nullable String getString(RealmHelper realmHelper,
String _id, String defaultValue) {
PublicSetting setting = get(realmHelper, _id);
if (setting != null) {
return setting.getValue();
}
return defaultValue;
}
public static @Nullable boolean getBoolean(RealmHelper realmHelper,
String _id, boolean defaultValue) {
PublicSetting setting = get(realmHelper, _id);
if (setting != null) {
return Boolean.parseBoolean(setting.getValue());
}
return defaultValue;
}
}
......@@ -7,11 +7,12 @@ import io.realm.annotations.PrimaryKey;
* holding statuses for uploading file.
*/
public class FileUploading extends RealmObject {
public static final int TO_S3 = 1;
public static final String STORAGE_TYPE_S3 = "AmazonS3";
public static final String STORAGE_TYPE_GRID_FS = "GridFS";
@PrimaryKey private String uplId;
private int syncstate;
private int repository;
private String storageType;
private String uri;
private String filename;
private long filesize;
......@@ -37,12 +38,12 @@ public class FileUploading extends RealmObject {
this.syncstate = syncstate;
}
public int getRepository() {
return repository;
public String getStorageType() {
return storageType;
}
public void setRepository(int repository) {
this.repository = repository;
public void setStorageType(String storageType) {
this.storageType = storageType;
}
public String getUri() {
......
......@@ -18,13 +18,14 @@ import chat.rocket.android.service.ddp.base.ActiveUsersSubscriber;
import chat.rocket.android.service.ddp.base.LoginServiceConfigurationSubscriber;
import chat.rocket.android.service.ddp.base.UserDataSubscriber;
import chat.rocket.android.service.observer.CurrentUserObserver;
import chat.rocket.android.service.observer.S3FileUploadingObserver;
import chat.rocket.android.service.observer.GetUsersOfRoomsProcedureObserver;
import chat.rocket.android.service.observer.FileUploadingToGridFsObserver;
import chat.rocket.android.service.observer.LoadMessageProcedureObserver;
import chat.rocket.android.service.observer.MethodCallObserver;
import chat.rocket.android.service.observer.NewMessageObserver;
import chat.rocket.android.service.observer.NotificationItemObserver;
import chat.rocket.android.service.observer.ReactiveNotificationManager;
import chat.rocket.android.service.observer.FileUploadingToS3Observer;
import chat.rocket.android.service.observer.SessionObserver;
import chat.rocket.android.service.observer.TokenLoginObserver;
import chat.rocket.android_ddp.DDPClientCallback;
......@@ -51,7 +52,8 @@ public class RocketChatWebSocketThread extends HandlerThread {
CurrentUserObserver.class,
ReactiveNotificationManager.class,
NotificationItemObserver.class,
S3FileUploadingObserver.class
FileUploadingToS3Observer.class,
FileUploadingToGridFsObserver.class
};
private final Context appContext;
private final String serverConfigId;
......
package chat.rocket.android.service.observer;
import android.content.Context;
import android.net.Uri;
import bolts.Task;
import chat.rocket.android.api.DDPClientWraper;
import chat.rocket.android.api.FileUploadingHelper;
import chat.rocket.android.helper.LogcatIfError;
import chat.rocket.android.helper.OkHttpHelper;
import chat.rocket.android.log.RCLog;
import chat.rocket.android.model.SyncState;
import chat.rocket.android.model.ddp.User;
import chat.rocket.android.model.internal.FileUploading;
import chat.rocket.android.model.internal.Session;
import chat.rocket.android.realm_helper.RealmHelper;
import io.realm.Realm;
import io.realm.RealmResults;
import java.io.InputStream;
import java.util.List;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.json.JSONObject;
/**
* execute file uploading and requesting sendMessage with attachment.
*/
public class FileUploadingToGridFsObserver extends AbstractModelObserver<FileUploading> {
private FileUploadingHelper methodCall;
public FileUploadingToGridFsObserver(Context context, String hostname,
RealmHelper realmHelper, DDPClientWraper ddpClient) {
super(context, hostname, realmHelper, ddpClient);
methodCall = new FileUploadingHelper(realmHelper, ddpClient);
realmHelper.executeTransaction(realm -> {
// resume pending operations.
RealmResults<FileUploading> pendingUploadRequests = realm.where(FileUploading.class)
.equalTo("syncstate", SyncState.SYNCING)
.equalTo("storageType", FileUploading.STORAGE_TYPE_GRID_FS)
.findAll();
for (FileUploading req : pendingUploadRequests) {
req.setSyncstate(SyncState.NOT_SYNCED);
}
// clean up records.
realm.where(FileUploading.class)
.beginGroup()
.equalTo("syncstate", SyncState.SYNCED)
.or()
.equalTo("syncstate", SyncState.FAILED)
.endGroup()
.equalTo("storageType", FileUploading.STORAGE_TYPE_GRID_FS)
.findAll().deleteAllFromRealm();
return null;
}).continueWith(new LogcatIfError());
}
@Override public RealmResults<FileUploading> queryItems(Realm realm) {
return realm.where(FileUploading.class)
.equalTo("syncstate", SyncState.NOT_SYNCED)
.equalTo("storageType", FileUploading.STORAGE_TYPE_GRID_FS)
.findAll();
}
@Override public void onUpdateResults(List<FileUploading> results) {
if (results.isEmpty()) {
return;
}
List<FileUploading> uploadingList = realmHelper.executeTransactionForReadResults(realm ->
realm.where(FileUploading.class).equalTo("syncstate", SyncState.SYNCING).findAll());
if (uploadingList.size() >= 1) {
// do not upload multiple files simultaneously
return;
}
User currentUser = realmHelper.executeTransactionForRead(realm ->
User.queryCurrentUser(realm).findFirst());
Session session = realmHelper.executeTransactionForRead(realm ->
Session.queryDefaultSession(realm).findFirst());
if (currentUser == null || session == null) {
return;
}
final String cookie = String.format("rc_uid=%s; rc_token=%s",
currentUser.get_id(), session.getToken());
FileUploading fileUploading = results.get(0);
final String roomId = fileUploading.getRoomId();
final String uplId = fileUploading.getUplId();
final String filename = fileUploading.getFilename();
final long filesize = fileUploading.getFilesize();
final String mimeType = fileUploading.getMimeType();
final Uri fileUri = Uri.parse(fileUploading.getUri());
realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(FileUploading.class, new JSONObject()
.put("uplId", uplId)
.put("syncstate", SyncState.SYNCING)
)
).onSuccessTask(_task -> methodCall.ufsCreate(filename, filesize, mimeType, roomId)
).onSuccessTask(task -> {
final JSONObject info = task.getResult();
final String fileId = info.getString("fileId");
final String token = info.getString("token");
final String url = info.getString("url");
final int bufSize = 16384; //16KB
final byte[] buffer = new byte[bufSize];
int offset = 0;
final MediaType contentType = MediaType.parse(mimeType);
try (InputStream inputStream = context.getContentResolver().openInputStream(fileUri)) {
int read;
while ((read = inputStream.read(buffer)) > 0) {
offset += read;
double progress = 1.0 * offset / filesize;
Request request = new Request.Builder()
.url(url + "&progress=" + progress)
.header("Cookie", cookie)
.post(RequestBody.create(contentType, buffer, 0, read))
.build();
Response response = OkHttpHelper.getClientForUploadFile().newCall(request).execute();
if (response.isSuccessful()) {
final JSONObject obj = new JSONObject()
.put("uplId", uplId)
.put("uploadedSize", offset);
realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(FileUploading.class, obj));
} else {
return Task.forError(new Exception(response.message()));
}
}
}
return methodCall.ufsComplete(fileId, token);
}).onSuccessTask(task -> methodCall.sendFileMessage(roomId, null, task.getResult())
).onSuccessTask(task -> realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(FileUploading.class, new JSONObject()
.put("uplId", uplId)
.put("syncstate", SyncState.SYNCED)
.put("error", JSONObject.NULL)
)
)).continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.w(task.getError());
return realmHelper.executeTransaction(realm ->
realm.createOrUpdateObjectFromJson(FileUploading.class, new JSONObject()
.put("uplId", uplId)
.put("syncstate", SyncState.FAILED)
.put("error", task.getError().getMessage())
));
} else {
return Task.forResult(null);
}
});
}
}
......@@ -30,10 +30,10 @@ import org.json.JSONObject;
/**
* execute file uploading and requesting sendMessage with attachment.
*/
public class S3FileUploadingObserver extends AbstractModelObserver<FileUploading> {
public class FileUploadingToS3Observer extends AbstractModelObserver<FileUploading> {
private FileUploadingHelper methodCall;
public S3FileUploadingObserver(Context context, String hostname,
public FileUploadingToS3Observer(Context context, String hostname,
RealmHelper realmHelper, DDPClientWraper ddpClient) {
super(context, hostname, realmHelper, ddpClient);
methodCall = new FileUploadingHelper(realmHelper, ddpClient);
......@@ -42,7 +42,7 @@ public class S3FileUploadingObserver extends AbstractModelObserver<FileUploading
// resume pending operations.
RealmResults<FileUploading> pendingUploadRequests = realm.where(FileUploading.class)
.equalTo("syncstate", SyncState.SYNCING)
.equalTo("repository", FileUploading.TO_S3)
.equalTo("storageType", FileUploading.STORAGE_TYPE_S3)
.findAll();
for (FileUploading req : pendingUploadRequests) {
req.setSyncstate(SyncState.NOT_SYNCED);
......@@ -55,7 +55,7 @@ public class S3FileUploadingObserver extends AbstractModelObserver<FileUploading
.or()
.equalTo("syncstate", SyncState.FAILED)
.endGroup()
.equalTo("repository", FileUploading.TO_S3)
.equalTo("storageType", FileUploading.STORAGE_TYPE_S3)
.findAll().deleteAllFromRealm();
return null;
}).continueWith(new LogcatIfError());
......@@ -64,7 +64,7 @@ public class S3FileUploadingObserver extends AbstractModelObserver<FileUploading
@Override public RealmResults<FileUploading> queryItems(Realm realm) {
return realm.where(FileUploading.class)
.equalTo("syncstate", SyncState.NOT_SYNCED)
.equalTo("repository", FileUploading.TO_S3)
.equalTo("storageType", FileUploading.STORAGE_TYPE_S3)
.findAll();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment