Code Monkey home page Code Monkey logo

salesforce-bulk's Introduction

github-workflow-sf-bulk-tests-badge

Salesforce Bulk

Python client library for accessing the asynchronous Salesforce.com Bulk API.

Installation

pip install salesforce-bulk

Authentication

To access the Bulk API you need to authenticate a user into Salesforce. The easiest way to do this is just to supply username, password and security_token. This library will use the simple-salesforce package to handle password based authentication.

from salesforce_bulk import SalesforceBulk

bulk = SalesforceBulk(username=username, password=password, security_token=security_token)
...

Alternatively if you run have access to a session ID and instance_url you can use those directly:

from urlparse import urlparse
from salesforce_bulk import SalesforceBulk

bulk = SalesforceBulk(sessionId=sessionId, host=urlparse(instance_url).hostname)
...

Operations

The basic sequence for driving the Bulk API is:

  1. Create a new job
  2. Add one or more batches to the job
  3. Close the job
  4. Wait for each batch to finish

Bulk Query

bulk.create_query_job(object_name, contentType='JSON')

Using API v39.0 or higher, you can also use the queryAll operation:

bulk.create_queryall_job(object_name, contentType='JSON')

Example

import json
from salesforce_bulk.util import IteratorBytesIO

job = bulk.create_query_job("Contact", contentType='JSON')
batch = bulk.query(job, "select Id,LastName from Contact")
bulk.close_job(job)
while not bulk.is_batch_done(batch):
    sleep(10)

for result in bulk.get_all_results_for_query_batch(batch):
    result = json.load(IteratorBytesIO(result))
    for row in result:
        print row # dictionary rows

Same example but for CSV:

import unicodecsv

job = bulk.create_query_job("Contact", contentType='CSV')
batch = bulk.query(job, "select Id,LastName from Contact")
bulk.close_job(job)
while not bulk.is_batch_done(batch):
    sleep(10)

for result in bulk.get_all_results_for_query_batch(batch):
    reader = unicodecsv.DictReader(result, encoding='utf-8')
    for row in reader:
        print(row) # dictionary rows

Note that while CSV is the default for historical reasons, JSON should be prefered since CSV has some drawbacks including its handling of NULL vs empty string.

PK Chunk Header

If you are querying a large number of records you probably want to turn on PK Chunking:

bulk.create_query_job(object_name, contentType='CSV', pk_chunking=True)

That will use the default setting for chunk size. You can use a different chunk size by providing a number of records per chunk:

bulk.create_query_job(object_name, contentType='CSV', pk_chunking=100000)

Additionally if you want to do something more sophisticated you can provide a header value:

bulk.create_query_job(object_name, contentType='CSV', pk_chunking='chunkSize=50000; startRow=00130000000xEftMGH')

Bulk Insert, Update, Delete

All Bulk upload operations work the same. You set the operation when you create the job. Then you submit one or more documents that specify records with columns to insert/update/delete. When deleting you should only submit the Id for each record.

For efficiency you should use the post_batch method to post each batch of data. (Note that a batch can have a maximum 10,000 records and be 1GB in size.) You pass a generator or iterator into this function and it will stream data via POST to Salesforce. For help sending CSV formatted data you can use the salesforce_bulk.CsvDictsAdapter class. It takes an iterator returning dictionaries and returns an iterator which produces CSV data.

Full example:

from salesforce_bulk import CsvDictsAdapter

job = bulk.create_insert_job("Account", contentType='CSV')
accounts = [dict(Name="Account%d" % idx) for idx in xrange(5)]
csv_iter = CsvDictsAdapter(iter(accounts))
batch = bulk.post_batch(job, csv_iter)
bulk.wait_for_batch(job, batch)
bulk.close_job(job)
print("Done. Accounts uploaded.")

Concurrency mode

When creating the job, pass concurrency='Serial' or concurrency='Parallel' to set the concurrency mode for the job.

salesforce-bulk's People

Contributors

alexhughson avatar alouie-sfdc avatar codingjoe avatar cvermilion avatar jer-tx avatar jmalonzo avatar lambacck avatar mjrossi avatar mnaberez avatar nikitakothari avatar overset avatar scottpersinger avatar siraj-h avatar snorf avatar svc-scm avatar vividboarder avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

salesforce-bulk's Issues

salesforce_bulk API when returns the data it converts the timestamp columns into bigint type

Here is a sample code. If you notice the SystemModstamp & LastModifiedDate column values are converted into bigint from timestamp type. How can we preserve the original data type in the returned result?

from salesforce_bulk import SalesforceBulk

bulk = SalesforceBulk(username=“[email protected]", password=“xxxxxxxxx”,     security_token=“xxxxxxxxxxxxxxxxxxxxxx”)
# Create a new job
job = bulk.create_query_job("Contact", contentType='JSON')

# Add one or more batches to the job
batch = bulk.query(job, "select Id,LastName, LastModifiedDate,     SystemModstamp from Contact where SystemModstamp <  LAST_N_DAYS:20")

# Close the job
bulk.close_job(job)

# Wait for each batch to finish
while not bulk.is_batch_done(batch):
    time.sleep(10)

results = []
for result in bulk.get_all_results_for_query_batch(batch):
    results.append(json.load(IteratorBytesIO(result)))

for row in results:
    print(row)

** Sample Output:**

{'attributes': {'type': 'Contact', 'url':
'/services/data/v40.0/sobjects/Contact/0034400001sdfeIaAAI'}, 'Id':
'0034400001sdfeIaAAI', 'LastName': 'obrien', 'LastModifiedDate':
1539024446000, 'SystemModstamp': 1539024446000}

PyPi Package is not up to date

@lambacck

I've been trying to use the salesforce-bulkipy library but have only run into errors when running it under a 3.5 environment. I cant create issues under the project directly so im doing it here

I've checked out the source that gets pulled into my environment from the pip install command but it is using different source from what's specified here. Specifically, it's missing all the python 3.x support it seems.

See Python logs for details

If i execute the same script under a 2.7 environment, it works. I stumbled onto it when i had to make some of the same changes that were performed on the real master branch and noticed the versions were different. I was thinking of making a PR for them anyways but it seems i should have gotten the changes in the first place.

support for Python 3

Traceback (most recent call last):
  File "reconcile.py", line 5, in <module>
    from salesforce_bulk import SalesforceBulk
  File "/usr/local/lib/python3.4/dist-packages/salesforce_bulk/__init__.py", line 1, in <module>
    from salesforce_bulk import SalesforceBulk
ImportError: cannot import name 'SalesforceBulk'

Can't Access Campaign and CampaignMember objects

When trying to upsert records to Campaign and CampaignMember objects, I get an error saying field names are not found.

Error: raise BulkBatchFailed(job_id, batch_id, status.get('stateMessage'), batch_state) salesforce_bulk.salesforce_bulk.BulkBatchFailed: Batch 7514x000004NePpAAK of job 7504x000004Qc36AAC failed: InvalidBatch : Field name not found : Id

More pk chunking handling

  • Prevent pk-chunking against not support object types.
  • Automatically use parent with share objects

get_all_results_for_query_batch extremely slow.

Hi all,
I am fetching ~10m records from a SF object using sf_bulk.create_query_job(sf_object_name, contentType="CSV", concurrency='Parallel', pk_chunking=True). All of the chunks finish in a reasonable amount of time. Then it comes time to get all of the records from each chunk, ~200,000 per chunk, I am using a list comprehension with get_all_results_for_query_batch() to get all of the results and return a list.

records_comprehension = [
record.decode("utf-8")
for chunk in sf_bulk.get_all_results_for_query_batch(batch_id, job)
for record in IteratorBytesIO(chunk)
]

To do this for ~200,000 items it is taking 20+ minutes and using ~15% of my 8 core CPU (I know I know python and multi core and all that. Just an interesting number. 12.5% is 100% usage of one core). As you can imagine that's not the best thing when trying to get 10m items. Right now I think I am being limited by single core performance during the creation of the records_comprehension specifically therecord.decode("utf-8") if I had to guess.

I am planning on parallelize the entire program in to 10 or so different processes to maximize the cpu resources I have available.

Other than parrelleizing the consumption of chunks is there something I am doing wrong? Is there a better way to go about this? In the examples it is essentially doing it this way, but with a double for loop instead of a comprehension (comprehension should be faster)

PS. When I get my code working as efficiently as possible I will submit a PR with a PK chunk example and some more function docs. Thank you Christopher Lambacher for making this project!

Documentation on env vars

Where is the documentation for the environment variables?

ajung@blackmoon2:~/src/ableton> bin/python bulk.py
Traceback (most recent call last):
File "bulk.py", line 3, in
client = get_client_bulk('test')
File "/data/home/ajung/src/ableton/util.py", line 27, in get_client_bulk
return SalesforceBulk(username=username, password=password + token)
File "/data/home/ajung/src/ableton/lib/python2.7/site-packages/salesforce_bulk/salesforce_bulk.py", line 60, in init
username, password)
File "/data/home/ajung/src/ableton/lib/python2.7/site-packages/salesforce_bulk/salesforce_bulk.py", line 87, in login_to_salesforce
', '.join(missing_env_vars)))
RuntimeError: You must set SALESFORCE_CLIENT_ID, SALESFORCE_CLIENT_SECRET, SALESFORCE_REDIRECT_URI to use username/pass login

Deprecate in favour of simple-salesforce

Hey!
I remember using salesforce-bulk way back in the day, it was great, but it doesn't seem that updated anymore. We've been using simple-salesforce in production for a number of years and it's been very solid and has continued support (1.0 is planned - simple-salesforce/simple-salesforce#364). Also, it goes beyond just bulk (it does support it - simple-salesforce/simple-salesforce@e9433e6), it has a lot of SFDC introspection, non-bulk APIs etc.

Do you think that for the sake of consolidation in this space, you might consider archiving this in favour of simple-salesforce? Or, if not, somehow describe this library's future in the docs, so that it's clearer to its users?

cc @lambacck

pk_chunking - original batch

Hi - having some trouble with using the pk_chunking function on Salesforce API v42.
In the docs for Salesforce it specifies that 'When a query is successfully chunked, the original batch’s status shows as NOT_PROCESSED. ' - however salesforce-bulk looks to be checking the status of all batches, seeing one that is status NOT_PROCESSED and then raising BulkBatchFailed error.

urlparse returning bad query errors, not able to login

i am passing the username and the password in the SalesforceBulk class but being block at this error

Traceback (most recent call last):
File "D:\pydevWorkplace\SALESFORCEBIN\BinMiser\LoginModule.py", line 6, in
import salesforce_oauth_request
File "C:\Python35\lib\site-packages\salesforce_oauth_request_init_.py", line 4, in
from .utils import login, oauth_flow, token_login
File "C:\Python35\lib\site-packages\salesforce_oauth_request\utils.py", line 9, in
import urlparse
File "C:\Python35\lib\urlparse.py", line 418
raise ValueError, "bad query field: %r" % (name_value,)
^
SyntaxError: invalid syntax

don't know y this is happening. I am following the documentation.

package requires security token even when API doesn't

In salesforce_bulk.py#login_to_salesforce, line 125, this package requires a security token in order to attempt authentication. Instead, this package should let the API reject the auth call.

I am currently allowlisting the IP address my requests are coming from, so I do not need to pass in my security token as far as the Salesforce API is concerned. I would prefer to not keep a security token on the server these requests are coming from.

My current workaround is this, which now works just fine, but is odd:

    passwd = SALESFORCE_PASSWORD
    token = SALESFORCE_SECURITY_TOKEN
    if not len(token):
        passwd, token = passwd[:-5], passwd[-5:]

    bulk = SalesforceBulk(
        username=SALESFORCE_USERNAME,
        password=passwd,
        security_token=token,
        sandbox=SALESFORCE_IS_SANDBOX,
        client_id=SALESFORCE_CLIENT_ID,
    )

is_batch_done() takes exactly 3 arguments (2 given)

Python 2.7

`from salesforce_bulk import SalesforceBulk
import json

bulk = SalesforceBulk(sessionId='', host='')

job = bulk.create_query_job("Contact", contentType='CSV')
batch = bulk.query(job, "select Id,LastName from Contact")
bulk.close_job(job)
while not bulk.is_batch_done(batch):
sleep(10)

for result in bulk.get_all_results_for_query_batch(batch):
result = json.load(IteratorBytesIO(result))
for row in result:
print row # dictionary rows`

Cannot read all Salesforce fields using salesforce_bulk

Hi All,
Not sure sure if this is an issue with the library.

I’m using the python salesforce_bulk library to extract data from Salesforce.

If I attempt to read all fields of some tables an error is returned INVALID_FIELD. Using the CONTACT table as an example I first perform a describe on the object to return all fields, then use that field list to query Salesforce. This returns an error giving me a list of invalid fields.

The query will work once those fields are removed. I’m sure my account has access to those fields because I can extract them using the synchronous (non bulk) query-all API. This uses the requests library.

I know compound fields are not supported by the bulk API so those have all been removed.

List of invalid fields from the CONTACT table:

Column Salesforce Data-type
EmailBouncedDate DateTime
IsEmailBounced Bool
PhotoUrl V_WString
Jigsaw V_WString
JigsawContactId V_WString
IndividualId V_WString
ActivityMetricId V_WString
Preferred_Name__c V_WString
Any advice would be greatly appreciated. Thank you

Bulk.insert Error

Hi

I am testing this with 100 Account rows and a batch size equals to 200. Everything is working fine from login to upload but during the process, it is continuously showing me following error:

salesforce-processing/Bulk.insert: Failure in processing file: Account.csv on pod: test-dev. Details: SFHelper.status: Call was successful, but some rows failed. Details: [{"batchId": "Id", "error": "Error"}]

Even after getting this error every time I run, all the records get uploaded to Salesforce without any issues.

Can anyone help me out please why I am getting this error over and over again whereas, there is nothing wrong with the records uploaded to Salesforce?

Compression for responses

I would like to request gzip compressed json results from the salesforce-bulk API like this:
job = bulk.create_query_job(sfObject, contentType='JSON', contentEncoding='gzip')

The documentation mentions that passing Content-Encoding: gzip in the header of the request enables this.

I can see that contentEncoding is not among the parameters of the create_query_job() function, but in the default headers something close is initialized:

Looking at the source it seems that Accepted-Encoding is used instead:

    def headers(self, values={}, content_type='application/xml'):
        default = {
            "X-SFDC-Session": self.sessionId,
            "Content-Type": "{}; charset=UTF-8".format(content_type),
            'Accept-Encoding': "gzip",
        }
        default.update(values)
        return default

Am I misunderstanding this or should it be

    def headers(self, values={}, content_type='application/xml'):
        default = {
            "X-SFDC-Session": self.sessionId,
            "Content-Type": "{}; charset=UTF-8".format(content_type),
            'Content-Encoding': "gzip",
        }
        default.update(values)
        return default

With kind regards and thanks for all the effort,
R

Add httplib2 to setup.py

Trying to install salesforce-bulk by running requirements.txt and it's erroring because it cannot find httplib2. I'm currently having to manually install httplib2 separately for everything to work because simply adding it to requirements.txt doesn't seem to do the trick.

I might be missing something, but why is this library not added to salesforce-bulk's setup.py file?

Thanks,
Travis

Query regarding new features

Hey @lambacck I'm the author of https://github.com/wingify/salesforce-bulkipy . We created this library primary because of:

  • Added support for Two-Factor Authentication by routing authentication via simple-salesforce
  • Added support for Salesforce Sandbox
  • Added support for parsing unicode characters in CSV
  • Explicit Upsert Support
  • Fixed various other bugs
  • Python 3 support

Has salesforce-bulk added integrated all of these features? If yes then then my fork library can be deemed redundant and I would like to add a link to the library README which redirects back to yours :)

streaming example

Would you mind posting an example when stream=true? Does this essentially require an additional loop to pull subsequent requests or is this handled by the existing call to iter_lines?

Release Version 2.0.0 To pypi

Open tasks/questions:

  • Is the API change significant enough to require a new package name?
  • Better documentation/changelog

get_batch_results() got an unexpected keyword argument 'callback'

Getting error when trying to delete records.

File "./bulk_refresh_liveops_users.py", line 75, in main
batch = bulk.bulk_delete(job, "LiveOps__c", "Deliv_ID__c <> '0'")
File "/usr/local/lib/python2.7/site-packages/salesforce_bulk/salesforce_bulk.py", line 317, in bulk_delete
query_job_id, query_batch_id, callback=save_results)
TypeError: get_batch_results() got an unexpected keyword argument 'callback'

"Invalid session id" with oauth access token valid for REST

I am using a Salesforce oauth access token for my developer salesforce account, which I know is a valid token because I can query the REST API using it.

But when I do

from urlparse import urlparse
from salesforce_bulk import SalesforceBulk

# `token` is that valid oauth token I mentioned, which works to make calls to REST API
# `instance_url` is something like 'https://na40.salesforce.com'
bulk = SalesforceBulk(sessionId=token, host=urlparse(instance_url).hostname)

# this line then errors
job = bulk.create_insert_job("Contact", contentType='CSV')

"""
BulkApiError: [400] Bulk API HTTP Error result: <?xml version="1.0" encoding="UTF-8"?><error
   xmlns="http://www.force.com/2009/06/asyncapi/dataload">
 <exceptionCode>InvalidSessionId</exceptionCode>
 <exceptionMessage>Invalid session id</exceptionMessage>
</error>
"""

I've seen a lot of questions about Bulk API, and seemingly valid tokens that cause that error.
yatish27/salesforce_bulk_api#33
https://salesforce.stackexchange.com/questions/136150/bulk-api-rejects-seemingly-valid-session-id
https://developer.salesforce.com/forums/?id=906F0000000fyaLIAQ
https://success.salesforce.com/answers?id=9063A000000iUovQAE

But I can't gather what the real answer is. Might this have to do with this Python library, or is it an account settings issue, (or a known API bug)?

Example for delete?

My workflow is as follows:

  1. upload data (through this package)
  2. process on salesforces' end
  3. get back results.

I'm getting the ID of the created objects from

results = bulk.get_batch_results(batch_id)

then iterating through that and appending to a list(listIds). Later, when I try to do step 3

newJob = bulk.create_delete_job("Custom_Obj__c")
newBatch = bulk.post_batch(newJob, iter(listIds))
bulk.wait_for_batch(newJob, newBatch)
bulk.close_job(newJob)
print("Successfully deleted all records that were uploaded")

I get the error of

salesforce_bulk.salesforce_bulk.BulkBatchFailed: Batch 7513F000000GpvjQAC of job 7503F0000002GXcQAM failed: InvalidBatch : The 'delete' batch must contain only ids

but as far as I know it is only IDs? Granted it's a list of IDs that I ran iter on but still.

getting the Null byte exception while running query

recs = [r for r in records]
File "/usr/local/lib/python2.7/dist-packages/salesforce_bulk/salesforce_bulk.py", line 419, in get_batch_results
for i, line in enumerate(iterator):
Error: line contains NULL byte

Logon failed,

Deprecation warning due to invalid escape sequences in Python 3.8

find . -iname '*.py'  | xargs -P 4 -I{} python3.8 -Wall -m py_compile {}

./salesforce_bulk/salesforce_bulk.py:302: DeprecationWarning: invalid escape sequence \w
  re.search(re.compile("from (\w+)", re.I), soql).group(1),
./salesforce_bulk/tests/test_salesforce_bulk.py:270: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", job_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:273: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", batch_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:297: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", job_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:301: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", batch_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:352: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", job_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:365: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", batch_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:391: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", job_id))
./salesforce_bulk/tests/test_salesforce_bulk.py:428: DeprecationWarning: invalid escape sequence \w
  self.assertIsNotNone(re.match("\w+", batch_id))

IndexError in get_batch_result_iter

Happens when trying to update the results from an upsert job.

Seems like the second request to fetch the result data is not necessary, since the response data from the first one already contains the final results with each upserted object, instead of the intermediate one with the results ids.

So when we try to extract the result id in line 505 (result_id = r.text.split("<result>")[1].split("</result>")[0]) it blows up, since there is no xml tag at all.

Here's an example I've just ran:

uri = self.endpoint + "/job/%s/batch/%s/result" % (job_id, batch_id)
r = requests.get(uri, headers=self.headers(), stream=True)
print(r.text)
>>> u'"Id","Success","Created","Error"\n"701o0000000kiShAAI","true","true",""\n'

I assume this behaviour is not observed with other kinds of jobs, so in that case I guess we could check whether the text contains the ids or the full result set.

Documentation example does not work

Documentation example talks about using the function get_batch_result_iter
for row in bulk.get_batch_result_iter(job, batch, parse_csv=True): print row #row is a dic

This is not correct. This function is not even getting the correct data. Instead of going to
something like
https://ap1.salesforce.com/services/async/29.0/job/75090000001HbevAAC/batch/751900000028eiqAAA/result/752900000009oag

Its is trying to get
https://ap1.salesforce.com/services/async/29.0/job/75090000001HbevAAC/batch/751900000028eiqAAA/result/

which is incorrect.

Documentation and examples

There's a need to document the code as well as update the README/Wiki with examples.
Since I have gone through the code and worked with it, I can put in some improvements and submit a PR.

pk_chunking is not working

Hi

I tried the example in the documentation

job = bulk.create_query_job("Contact", contentType='CSV', pk_chunking=True)
batch = bulk.query(job, "select Id,LastName from Contact")
bulk.close_job(job)
while not bulk.is_batch_done(batch):
    sleep(10)

for result in bulk.get_all_results_for_query_batch(batch):
    reader = unicodecsv.DictReader(result, encoding='utf-8')
    for row in reader:
        print(row) # dictionary rows

BulkBatchFailed Traceback (most recent call last)
in
8 batch = bulk.query(job, "select Id,LastName from Contact")
9 bulk.close_job(job)
---> 10 while not bulk.is_batch_done(batch):
11 sleep(10)
12

c:\python37\lib\site-packages\salesforce_bulk\salesforce_bulk.py in is_batch_done(self, batch_id, job_id)
426 if batch_state in bulk_states.ERROR_STATES:
427 status = self.batch_status(batch_id, job_id)
--> 428 raise BulkBatchFailed(job_id, batch_id, status.get('stateMessage'), batch_state)
429 return batch_state == bulk_states.COMPLETED
430

BulkBatchFailed: Batch 123456789 of job None failed: None

Without pk_chunking it works though.
Any ideas what am i doing wrong here any help would be appreciated

Fix README examples (causing low performance)

Hi,

There is an issue in README examples:
https://github.com/heroku/salesforce-bulk/blame/main/README.rst#L66
https://github.com/heroku/salesforce-bulk/blame/main/README.rst#L75

IteratorBytesIO is not needed cause the function already returns it.

return util.IteratorBytesIO(iter)

What I've noticed is that when you run result = json.load(IteratorBytesIO(result)) instead of result = json.load(result) my script is running 35 min vs 2 min. It can cause problems for other users as well in opened issues.

Connection failing after some time

Traceback (most recent call last):
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connection.py", line 157, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\util\connection.py", line 61, in create_connection
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\socket.py", line 752, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno 11002] getaddrinfo failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "d:/DataDeletion/datacreatorusingbulk.py", line 30, in
batch= bulk.post_batch(job,csv_itr)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\salesforce_bulk\salesforce_bulk.py", line 334, in post_batch
resp = requests.post(uri, data=data_generator, headers=headers)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\requests\api.py", line 119, in post
return request('post', url, data=data, json=json, **kwargs)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\requests\api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\requests\sessions.py", line 530, in request
resp = self.send(prep, **send_kwargs)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\requests\sessions.py", line 643, in send
r = adapter.send(request, **kwargs)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\requests\adapters.py", line 467, in send
low_conn.endheaders()
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\http\client.py", line 1247, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\http\client.py", line 1026, in _send_output
self.send(msg)
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\http\client.py", line 966, in send
self.connect()
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connection.py", line 300, in connect
conn = self._new_conn()
File "C:\Users\lingampet.kartheek\AppData\Local\Programs\Python\Python37\lib\site-packages\urllib3\connection.py", line 169, in _new_conn
self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.VerifiedHTTPSConnection object at 0x0000029D278CD848>: Failed to establish a new connection: [Errno 11002] getaddrinfo failed

The batch is stopped after some time due to the above error.

Logger errors when getting batch results

The following lines in get_batch_result_iter are not correct and through exception

logger("Bulk batch %s processed %d records" % (batch_id, status['numberRecordsProcessed']))

logger("Bulk batch %s had %d failed records" % (batch_id, failed))

Two problems here

  1. logger should be logger.debug()
  2. %d directive is incorrect as the status contains these values as strings

Update Bulk Query Example on Readme

The example in the readme does not work for me on python 3.4.

Here's an example that does work for me -- just sharing.

import codecs

reader = codecs.getreader("utf-8")

for result in bulk.get_all_results_for_query_batch(batch):
    converted_result = IteratorBytesIO(result)
    result_list = json.load(reader(converted_result))
    for row in result_list:
        print(row) # dictionary rows

Error: No module named util

from salesforce_bulk.util import IteratorBytesIO
ImportError: No module named util

I am using python 2.7

Installed this package with pip install salesforce-bulk

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.